-
Notifications
You must be signed in to change notification settings - Fork 203
feat(torii): task network (dep graph) & processors #3172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughOhayo sensei! This update introduces three new Rust crates: Changes
Sequence Diagram(s)sequenceDiagram
participant Blockchain
participant Provider
participant Processors
participant EventProcessor
participant Database
Blockchain->>Provider: Emits block/tx/event
Provider->>Processors: Passes event/block/tx
Processors->>EventProcessor: Selects processor by contract type & event key
EventProcessor->>Database: Processes and stores result
EventProcessor-->>Processors: Processing complete
sequenceDiagram
participant TaskNetwork
participant TaskHandler
participant Semaphore
loop For each topological level
TaskNetwork->>Semaphore: Acquire permits for level
par For each task in level
TaskNetwork->>TaskHandler: Execute task async
TaskHandler-->>TaskNetwork: Task result
end
Semaphore-->>TaskNetwork: Release permits
end
TaskNetwork-->>TaskHandler: All tasks processed
Possibly related PRs
Suggested reviewers
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 26
♻️ Duplicate comments (2)
crates/torii/processors/src/lib.rs (2)
63-65
: Bound adjustment identical to previous comment
BlockProcessor
has the same missingSend
bound; please align it with the fix suggested above.
75-77
: SameSend
‑bound issue forTransactionProcessor
Replicate the provider bound change here as well.
🧹 Nitpick comments (32)
crates/torii/processors/src/processors/raw_event.rs (1)
36-49
: No-op process implementation is acceptable for now.The empty implementation with just
Ok(())
makes sense for a catch-all processor that doesn't need specific processing logic. The comment "We can choose to consider them, or not" suggests this design is intentional.However, you might want to consider adding more detailed logging or telemetry to track the volume and types of events being caught by this fallback processor. This could help identify patterns that might warrant dedicated processors in the future.
async fn process( &self, _world: &WorldContractReader<P>, _db: &mut Sql, _block_number: u64, _block_timestamp: u64, _event_id: &str, _event: &Event, _config: &EventProcessorConfig, ) -> Result<(), Error> { // We can choose to consider them, or not. + tracing::debug!( + target: "torii::indexer::processors::raw_event", + event_id = %_event_id, + from_address = ?_event.from_address, + keys = ?_event.keys, + data_len = %_event.data.len(), + "Processing unhandled event" + ); Ok(()) }crates/torii/processors/src/processors/erc20_legacy_transfer.rs (3)
52-61
: Validation logic is accurate for legacy ERC20 transfers.The validation correctly checks for 1 key and 4 data elements according to the legacy ERC20 contract format cited in the comment.
However, the return statement could be simplified:
- if event.keys.len() == 1 && event.data.len() == 4 { - return true; - } - - false + event.keys.len() == 1 && event.data.len() == 4
77-78
: Cairo deserialization could benefit from more robust error handling.The code deserializes the value but doesn't handle potential issues with malformed data beyond propagating the error.
- let value = U256Cainome::cairo_deserialize(&event.data, 2)?; - let value = U256::from_words(value.low, value.high); + let value = match U256Cainome::cairo_deserialize(&event.data, 2) { + Ok(v) => v, + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + error = %e, + token_address = ?token_address, + from = ?from, + to = ?to, + "Failed to deserialize ERC20 transfer value" + ); + return Err(anyhow::anyhow!("Failed to deserialize ERC20 transfer value: {}", e)); + } + }; + let value = U256::from_words(value.low, value.high);
90-90
: Consider enhancing debug logging for transfers.The current debug log contains the basic transfer details, but could include the token address for completeness.
- debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer."); + debug!( + target: LOG_TARGET, + token_address = ?token_address, + from = ?from, + to = ?to, + value = ?value, + "Legacy ERC20 Transfer." + );crates/torii/processors/src/processors/erc1155_transfer_single.rs (1)
41-70
: Expose theoperator
for richer analytics (optional)We currently ignore
operator
(event.keys[1]
). Indexing that field would let downstream consumers distinguish between direct and approved transfers, which can be valuable for audit trails.+ let operator = event.keys[1]; let from = event.keys[2]; let to = event.keys[3]; db.handle_nft_transfer( token_address, + operator, // new from, to, token_id, amount, block_timestamp, event_id, )Adapting the DB signature is, of course, a wider change, so treat this as a nice‑to‑have.
crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs (1)
58-62
: Consider a bounded iterator for huge rangesThe
while token_id <= to_token_id { …; token_id += 1 }
loop is correct but could run millions of iterations for gigantic collections, blocking other tasks.A simple ergonomic tweak keeps readability and avoids accidental infinite loops:
- let mut token_id = from_token_id; - while token_id <= to_token_id { - db.update_nft_metadata(token_address, token_id).await?; - token_id += U256::from(1u8); - } + for token_id in (from_token_id..=to_token_id).into_iter() { + db.update_nft_metadata(token_address, token_id).await?; + }Internally this still iterates, but the intent is clearer and less error‑prone.
crates/torii/processors/src/processors/erc4906_metadata_update.rs (1)
49-54
: Consider a lightweight pre‑flight guard for obviously invalid IDs.
If the on‑chain contract accidentally emitstoken_id == 0
for collections that start at 1, the DB will still be hit. A quick check (and earlyreturn Ok(())
) would protect against spurious writes without measurable overhead.+ // sanity‑check: ignore zero IDs which are invalid for many collections + if token_id.is_zero() { + debug!(target: LOG_TARGET, "Skipping zero token_id"); + return Ok(()); + }crates/torii/processors/src/processors/erc20_transfer.rs (2)
44-56
: Hashing strategy groups opposite‑direction transfers but ignores amount.
Usingmax(from, to)
makes A→B and B→A share a task, which is great, but two concurrent transfers A→B for different amounts will still serialize because they hash to the same task‑id. If throughput becomes an issue, add the block number or event index to the hash to widen the distribution.No immediate change required, just something to keep in mind while benchmarking.
85-85
: Nit: missing space after first key indebug!
macro.
Log output currently renders as..,from=0xabc, to = ...
(note the space asymmetry). Tiny, but consistent formatting helps log scrapers.- debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer."); + debug!(target: LOG_TARGET, from = ?from, to = ?to, value = ?value, "ERC20 Transfer.");crates/torii/processors/src/processors/store_update_member.rs (1)
31-33
: Ohayo sensei!validate()
always succeeds – tighten the checks
Currently any event passes through, even if the key/data layout is malformed, which may later cause panics deeper in the pipeline. Consider validating:
event.keys.len() == 4
(hash, model selector, entity id, member selector)!event.data.is_empty()
– there should be at least one felt containing the new value(s)This guards early and lets you surface a clean error instead of crashing during deserialization.
crates/torii/processors/src/processors/erc1155_transfer_batch.rs (2)
27-33
: Ohayo sensei! Strengthenvalidate()
for malformed data
event.data.is_empty()
being false doesn’t guarantee the array is long enough to containids_len * 2 + values_len * 2 + 2
felts. A stricter check (e.g. at least3
felts) will let you return early and avoid out‑of‑bounds reads later.
38-40
: Ohayo sensei! Consider finer‑grainedTaskId
for better parallelism
Using the global sequential ID forces all ERC‑1155 batch transfers to serialize. Hashing(token_address, from, to)
(or even(token_address, token_id)
) would still guarantee ordering per NFT while allowing unrelated collections to process in parallel, improving throughput.crates/torii/processors/src/processors/erc721_legacy_transfer.rs (2)
99-100
: Ohayo sensei! Clarify log message for legacy path
The debug string says"ERC721 Transfer."
, which is identical to the non‑legacy processor and can confuse log consumers. Append “legacy” to make grepping easier.-debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer."); +debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "Legacy ERC721 Transfer.");
72-98
: Ohayo sensei! Duplicate logic witherc721_transfer.rs
– extract a helper
Erc721LegacyTransferProcessor
and the modern variant share almost identical hashing and DB‑write logic. Extracting a small util (e.g.compute_erc721_task_id
,handle_erc721_transfer
) would reduce duplication and ensure future fixes land in one spot.crates/torii/processors/src/processors/erc721_transfer.rs (2)
50-55
: Ohayo sensei – potential hash collisions in canonical address pairingUsing only
std::cmp::max(from, to)
drops half of the information:
pairs(A, B)
and(C, B)
both compress toB
whenB
is the larger address, increasing the chance of unrelated transfers sharing the same task‑id and therefore being processed sequentially.Consider hashing both addresses in a stable order instead:
- let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]); - canonical_pair.hash(&mut hasher); + let (low, high) = if event.keys[1] < event.keys[2] { + (event.keys[1], event.keys[2]) + } else { + (event.keys[2], event.keys[1]) + }; + low.hash(&mut hasher); + high.hash(&mut hasher);
80-82
: Ohayo sensei – deserialisation helper can panic on malformed events
U256Cainome::cairo_deserialize(&event.keys, 3)?
will bubble an error that results in the whole block failing.
You may want to replace?
with graceful logging +return Ok(())
to skip malformed transfers instead of aborting indexing.crates/torii/processors/src/processors/upgrade_event.rs (1)
70-76
: Ohayo sensei – avoid shadowing the term “model” for eventsThe comment hints at future renaming but the variable and log messages still use “model”, which is confusing when processing events.
Renaming now will save a lot of grep‑pain later.- let model = match db.model(event.selector).await { + let stored_event = match db.model(event.selector).await {crates/torii/processors/src/processors/store_set_record.rs (2)
38-43
: Ohayo sensei – possible out‑of‑bounds intask_identifier
event.keys[1]
andevent.keys[2]
assume ≥ 3 keys. If the upstream contract ever changes or emits a malformed log, this will panic beforevalidate
has a chance to filter. Either guard length or reuse the improvedvalidate
suggested above.-fn task_identifier(&self, event: &Event) -> TaskId { +fn task_identifier(&self, event: &Event) -> TaskId { + if event.keys.len() < 3 { + return TASK_ID_SEQUENTIAL; // fall back to sequential processing + } let mut hasher = DefaultHasher::new(); event.keys[1].hash(&mut hasher); event.keys[2].hash(&mut hasher); hasher.finish() }
100-104
: Ohayo sensei – clarifykeys_and_unpacked
ordering
The concatenation order[keys, values]
is subtle; a quick comment or helper makes maintenance easier and prevents accidental re‑ordering in future edits.crates/torii/processors/src/processors/upgrade_model.rs (3)
30-34
: Ohayo sensei – unusedvalidate
compromise clarity
Same as the previous file: unconditionaltrue
defeats the purpose of the hook and can hide malformed events. Consider removing the method or adding real checks.
40-44
: Ohayo sensei – hashing all keys may over‑serializeTaskId
If the event has many keys, this defeats deduplication and can scatter logically‑related upgrades across tasks. Hashing only the selector (event.keys[1]
) would suffice and group upgrades per‑model.
92-97
: Ohayo sensei – blocking call risk inside async context
model.set_block(...).await;
performs an on‑chain RPC for every upgrade event. In a high‑traffic chain this may turn the event loop into sequential IO. Consider batching or caching results between events of the same block.crates/torii/processors/src/task_manager.rs (1)
74-87
: Ohayo sensei – semaphore created once, but tasks span multiple priorities
Creating the semaphore outside the priority loop is great; double‑checkmax_concurrent_tasks
sizing so high‑priority floods don’t starve lower ones (there’s no fairness re‑queue). A comment explaining this policy would help future maintainers.crates/torii/processors/src/processors/controller.rs (2)
26-52
: Ohayo sensei —lazy_static
is overkill for a plain constant
CARTRIDGE_MAGIC
is fully known at compile‑time, so aconst
(orstatic
withoutlazy_static!
) will avoid an extra runtime init guard and removes the macro dependency.-use lazy_static::lazy_static; - -lazy_static! { - pub(crate) static ref CARTRIDGE_MAGIC: [Felt; 22] = [ +/// Felt representation of "https://x.cartridge.gg/" +pub(crate) const CARTRIDGE_MAGIC: [Felt; 22] = [ … - ]; -} +];You can now drop the
lazy_static
import.
82-85
: Validation comment contradicts the check
The comment says “has no keys” but the code enforceslen() == 1
. Please clarify which is correct for aContractDeployed
UDC event—otherwise fellow devs (and future you) will be confused.- // ContractDeployed event has no keys and contains username in data - event.keys.len() == 1 && !event.data.is_empty() + // UDC `ContractDeployed` emits exactly one key (class hash) + event.keys.len() == 1 && !event.data.is_empty()crates/torii/processors/src/processors/metadata_update.rs (2)
35-37
: No sanity‑check invalidate
– intentional?
validate
always returnstrue
. If that is deliberate, a short comment will prevent future attempts to “fix” it.fn validate(&self, _event: &Event) -> bool { // All world events are well‑formed, no additional filters needed. true }
87-90
: Detached Tokio task may outlive DB connection
tokio::spawn
moves a clonedSql
, but if the node shuts down before the task finishes you may leak in‑flight work. Consider usingtokio::task::spawn_blocking
with aJoinSet
or hand the job to the task manager so graceful shutdown awaits completion.crates/torii/processors/src/processors/store_transaction.rs (2)
180-211
: Magic offsets for v2/v3 outside calls feel brittle
Hard‑coding indices (calldata[5]
,calldata[4]
) ties the parser to a single compiler version. Consider parsing using the struct layout (e.g., via CairoSerde) or at least centralise the offsets asconst
with a unit test.
284-303
: Silent fallback when execute deserialization fails
If both deserializers fail we silently store an empty call list. This hides bugs and makes debugging harder. Emit awarn!
so operators know a transaction was skipped.crates/torii/processors/src/lib.rs (3)
44-46
: Avoid intermediate allocation when joining event keysCollecting into a
Vec<String>
allocates oneString
per key plus the vector itself.
A small tweak avoids both allocations:- event.keys.iter().map(|i| format!("{:#064x}", i)).collect::<Vec<_>>().join(",") + event + .keys + .iter() + .map(|i| format!("{:#064x}", i)) + .collect::<std::borrow::Cow<str>>()Alternatively, use
itertools::join
orfmt::Write
with a pre‑allocatedString
.
23-25
: Potential namespace‑case mismatch
HashSet<String>
is case‑sensitive. If namespaces come from user input or chain data with inconsistent casing, indexing may silently skip events. Consider normalising to lowercase on insertion & lookup or documenting the expectation.
28-34
: API ergonomics ofTaskProcessor
dependencies
currently allocates a newVec
on every call. If the set of dependencies is static per processor, returning a slice or iterator would avoid repeated allocations:- fn dependencies(&self) -> Vec<TaskId>; + fn dependencies(&self) -> &[TaskId];This requires small adjustments in call‑sites but improves performance and conveys immutability.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
crates/torii/processors/Cargo.toml
(1 hunks)crates/torii/processors/src/lib.rs
(1 hunks)crates/torii/processors/src/processors/controller.rs
(1 hunks)crates/torii/processors/src/processors/erc1155_transfer_batch.rs
(1 hunks)crates/torii/processors/src/processors/erc1155_transfer_single.rs
(1 hunks)crates/torii/processors/src/processors/erc20_legacy_transfer.rs
(1 hunks)crates/torii/processors/src/processors/erc20_transfer.rs
(1 hunks)crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs
(1 hunks)crates/torii/processors/src/processors/erc4906_metadata_update.rs
(1 hunks)crates/torii/processors/src/processors/erc721_legacy_transfer.rs
(1 hunks)crates/torii/processors/src/processors/erc721_transfer.rs
(1 hunks)crates/torii/processors/src/processors/event_message.rs
(1 hunks)crates/torii/processors/src/processors/metadata_update.rs
(1 hunks)crates/torii/processors/src/processors/mod.rs
(1 hunks)crates/torii/processors/src/processors/raw_event.rs
(1 hunks)crates/torii/processors/src/processors/register_event.rs
(1 hunks)crates/torii/processors/src/processors/register_model.rs
(1 hunks)crates/torii/processors/src/processors/store_del_record.rs
(1 hunks)crates/torii/processors/src/processors/store_set_record.rs
(1 hunks)crates/torii/processors/src/processors/store_transaction.rs
(1 hunks)crates/torii/processors/src/processors/store_update_member.rs
(1 hunks)crates/torii/processors/src/processors/store_update_record.rs
(1 hunks)crates/torii/processors/src/processors/upgrade_event.rs
(1 hunks)crates/torii/processors/src/processors/upgrade_model.rs
(1 hunks)crates/torii/processors/src/task_manager.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
crates/torii/processors/src/processors/raw_event.rs (7)
crates/torii/processors/src/lib.rs (5)
event_key
(42-42)validate
(48-48)process
(51-60)process
(66-72)process
(78-88)crates/torii/processors/src/processors/erc20_transfer.rs (5)
event_key
(25-27)validate
(29-38)task_priority
(40-42)task_identifier
(44-56)process
(58-88)crates/torii/processors/src/processors/event_message.rs (5)
event_key
(27-29)validate
(31-33)task_priority
(35-37)task_identifier
(39-50)process
(52-97)crates/torii/processors/src/processors/store_del_record.rs (4)
event_key
(25-27)validate
(29-31)task_priority
(33-35)task_identifier
(37-42)crates/torii/processors/src/processors/store_update_member.rs (2)
task_priority
(35-37)task_identifier
(39-46)crates/torii/processors/src/processors/store_update_record.rs (2)
task_priority
(34-36)task_identifier
(38-45)crates/torii/processors/src/processors/upgrade_event.rs (2)
task_priority
(36-38)task_identifier
(40-44)
crates/torii/processors/src/processors/erc4906_metadata_update.rs (6)
crates/torii/processors/src/lib.rs (6)
event_key
(42-42)validate
(48-48)event
(45-45)process
(51-60)process
(66-72)process
(78-88)crates/torii/processors/src/processors/erc1155_transfer_single.rs (5)
event_key
(23-25)validate
(27-31)task_priority
(33-35)task_identifier
(37-39)process
(41-74)crates/torii/processors/src/processors/erc20_transfer.rs (5)
event_key
(25-27)validate
(29-38)task_priority
(40-42)task_identifier
(44-56)process
(58-88)crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs (5)
event_key
(23-25)validate
(27-31)task_priority
(33-35)task_identifier
(37-39)process
(41-73)crates/torii/processors/src/processors/erc721_transfer.rs (5)
event_key
(25-27)validate
(29-38)task_priority
(40-42)task_identifier
(44-64)process
(66-96)crates/torii/processors/src/processors/metadata_update.rs (5)
event_key
(31-33)validate
(35-37)task_priority
(39-41)task_identifier
(43-47)process
(49-93)
crates/torii/processors/src/processors/erc721_legacy_transfer.rs (8)
crates/torii/processors/src/processors/erc20_legacy_transfer.rs (4)
contract_type
(44-46)event_key
(48-50)validate
(52-61)process
(63-93)crates/torii/processors/src/processors/erc1155_transfer_batch.rs (5)
event_key
(23-25)validate
(27-32)task_priority
(34-36)task_identifier
(38-40)process
(42-111)crates/torii/processors/src/processors/erc1155_transfer_single.rs (5)
event_key
(23-25)validate
(27-31)task_priority
(33-35)task_identifier
(37-39)process
(41-74)crates/torii/processors/src/processors/erc4906_metadata_update.rs (5)
event_key
(22-24)validate
(26-29)task_priority
(31-33)task_identifier
(35-37)process
(39-63)crates/torii/processors/src/processors/erc721_transfer.rs (5)
event_key
(25-27)validate
(29-38)task_priority
(40-42)task_identifier
(44-64)process
(66-96)crates/torii/processors/src/processors/store_del_record.rs (4)
event_key
(25-27)validate
(29-31)task_priority
(33-35)task_identifier
(37-42)crates/torii/processors/src/processors/store_set_record.rs (4)
event_key
(26-28)validate
(30-32)task_priority
(34-36)task_identifier
(38-43)crates/torii/processors/src/processors/store_update_record.rs (2)
task_priority
(34-36)task_identifier
(38-45)
crates/torii/processors/src/task_manager.rs (8)
crates/torii/processors/src/processors/erc20_transfer.rs (2)
task_identifier
(44-56)event_key
(25-27)crates/torii/processors/src/processors/erc721_transfer.rs (2)
task_identifier
(44-64)event_key
(25-27)crates/torii/processors/src/processors/event_message.rs (2)
task_identifier
(39-50)event_key
(27-29)crates/torii/processors/src/processors/metadata_update.rs (2)
task_identifier
(43-47)event_key
(31-33)crates/torii/processors/src/processors/raw_event.rs (2)
task_identifier
(31-34)event_key
(19-21)crates/torii/processors/src/processors/register_event.rs (2)
task_identifier
(40-44)event_key
(26-28)crates/torii/processors/src/processors/store_set_record.rs (1)
task_identifier
(38-43)crates/torii/processors/src/lib.rs (3)
contract_type
(41-41)event
(45-45)event_key
(42-42)
crates/torii/processors/src/processors/mod.rs (6)
crates/dojo/world/src/contracts/abigen/world.rs (3)
register_event
(3854-3869)upgrade_event
(4245-4260)upgrade_model
(4280-4295)crates/torii/sqlite/sqlite/src/lib.rs (1)
store_transaction
(517-556)crates/torii/processors/src/lib.rs (1)
contract_type
(41-41)crates/torii/processors/src/processors/controller.rs (1)
contract_type
(74-76)crates/torii/processors/src/processors/erc20_legacy_transfer.rs (1)
contract_type
(44-46)crates/torii/processors/src/processors/erc721_legacy_transfer.rs (1)
contract_type
(27-29)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
🔇 Additional comments (17)
crates/torii/processors/Cargo.toml (2)
1-8
: Ohayo Sensei! The package configuration looks good.The crate metadata is well-structured and leverages workspace configuration. Good job on creating a clean and concise package definition.
9-17
: Dependencies look complete and appropriate.The dependencies include all necessary components for processing blockchain events - database access through
torii-sqlite
, blockchain interaction viadojo-world
andstarknet
, async support withtokio
andasync-trait
, and utilities likeanyhow
,tracing
, andcainome
for error handling, logging, and serialization.crates/torii/processors/src/processors/raw_event.rs (4)
11-12
: The default processor structure looks good, sensei.The empty struct with
Default
andDebug
traits is appropriate for this basic processor implementation.
19-21
: Empty event key for catch-all processor - intentional design choice?Using an empty string as the event key suggests this processor is meant to catch all events that don't match other processors.
Is this intentional to serve as a fallback processor for events not handled by other specialized processors?
23-25
: Always validates events - appropriate for catch-all processor.This behavior ensures that all events will be accepted by this processor, which aligns with its apparent purpose as a fallback processor.
31-34
:❓ Verification inconclusive
TODO for raw event parallelization.
Currently using sequential task ID which means raw events won't be processed in parallel.
Would you like me to create an issue to track this parallelization enhancement for raw events?
🏁 Script executed:
#!/bin/bash # Check if there's a GitHub issue tracking this TODO gh issue list --search "raw events parallelization" --repo "$(git remote get-url origin | sed 's/.*github.com\///' | sed 's/\.git//')"Length of output: 152
No existing GitHub issue for raw event parallelization found
ohayo sensei! I searched for an issue tracking the “raw events parallelization” TODO in
crates/torii/processors/src/processors/raw_event.rs
(lines 31–34) and didn’t find one. Could you double‑check if a ticket already exists, or should we open a new GitHub issue to track this enhancement?crates/torii/processors/src/processors/register_model.rs (3)
36-38
: Highest task priority (0) for model registration - excellent choice, sensei!Assigning the highest priority (0) to model registration is appropriate since models need to be registered before any events that might reference them.
80-85
: Conditional block context setting looks good.Setting the block context only when strict model reading is enabled is a good approach. This provides flexibility while ensuring accurate data retrieval in strict mode.
92-109
: Comprehensive logging implementation - excellent work, sensei!The logging is well-structured with different levels (info for high-level events, debug for details) and includes all relevant information. The use of structured logging attributes will make filtering and analyzing logs much easier.
crates/torii/processors/src/processors/erc20_legacy_transfer.rs (1)
22-36
: Task processing implementation looks solid.The processor correctly implements the
TaskProcessor
trait with no dependencies and a task identifier based on the contract address, which makes sense for grouping transfers from the same token contract.crates/torii/processors/src/processors/erc1155_transfer_single.rs (1)
27-31
: Ohayo sensei — validation looks solid!The key & data‑length check accurately guards against malformed
TransferSingle
events. Nice and concise.crates/torii/processors/src/processors/erc4906_metadata_update.rs (1)
26-29
: Ohayo sensei! Validation logic looks spot‑on.
The key & data length check correctly distinguishes a single‑token ERC‑4906 update from the batch variant – nice attention to the spec.crates/torii/processors/src/processors/store_update_record.rs (1)
103-112
: Key‑field pruning mutates sharedTy
in‑place – clone first to avoid surprises.
If another processor still holds a reference to the same schema instance, mutatingchildren
here could cause hard‑to‑trace bugs. Cloning before mutation isolates the change:- let mut entity = model.schema; + let mut entity = model.schema.clone();crates/torii/processors/src/processors/mod.rs (1)
116-121
: Ohayo sensei – return type ofget_event_processors
no longer matches map key/valueAfter changing the map, update the accessor accordingly and drop the hard‑coded
Felt
key that currently does not compile:- ) -> &HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>> { + ) -> &EventProcessorMap<P> {You may also want to return an
Option
rather than panicking withunwrap()
.crates/torii/processors/src/processors/store_set_record.rs (1)
30-32
: Ohayo sensei –validate
is a no‑op, consider real guards
Returningtrue
unconditionally means the later logic may happily indexevent.keys[1]
&event.keys[2]
without checking vector length or semantics. A cheap upfront guard (e.g.event.keys.len() >= 3
) would turn potential panics into early, recoverable errors.Would you like a small patch to tighten this?
crates/torii/processors/src/processors/upgrade_model.rs (1)
70-82
: Ohayo sensei – error filtering misses namespace gating
When a model is missing you ignore all namespaces, not just the filtered subset (config.namespaces
). Align withStoreSetRecordProcessor
to avoid indexing noise when namespace filtering is active.crates/torii/processors/src/processors/controller.rs (1)
131-133
: Uniqueness/duplication handling for controllers
db.add_controller
is called unconditionally. If the same address is redeployed (reorg or replay) will this violate a unique constraint? Consider making the DB insert idempotent or usingINSERT … ON CONFLICT DO NOTHING
.
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { | ||
panic!( | ||
"Expected {} event to be well formed.", | ||
<RegisterModelProcessor as EventProcessor<P>>::event_key(self) | ||
) | ||
}) { | ||
WorldEvent::ModelRegistered(e) => e, | ||
_ => { | ||
unreachable!() | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Unwrap usage in event deserialization could be improved.
The code uses unwrap_or_else
with a panic, which is risky in production code. While the comment explains why this should be safe, it would be more robust to handle the error gracefully.
- let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
- panic!(
- "Expected {} event to be well formed.",
- <RegisterModelProcessor as EventProcessor<P>>::event_key(self)
- )
- }) {
+ let event = match WorldEvent::try_from(event) {
+ Ok(event) => event,
+ Err(e) => {
+ tracing::error!(
+ target: LOG_TARGET,
+ error = %e,
+ "Failed to parse {} event. This indicates a version mismatch between Torii and World.",
+ <RegisterModelProcessor as EventProcessor<P>>::event_key(self)
+ );
+ return Err(anyhow::anyhow!(
+ "Failed to parse {} event: {}",
+ <RegisterModelProcessor as EventProcessor<P>>::event_key(self),
+ e
+ ));
+ }
+ };
Committable suggestion skipped: line range outside the PR's diff.
let namespace = event.namespace.to_string().unwrap(); | ||
let name = event.name.to_string().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Unwrapped string conversions should handle potential errors.
Similar to the previous comment, the unwrap
calls for string conversion could be handled more gracefully.
- let namespace = event.namespace.to_string().unwrap();
- let name = event.name.to_string().unwrap();
+ let namespace = event.namespace.to_string().map_err(|_| {
+ anyhow::anyhow!("Failed to convert namespace to string")
+ })?;
+ let name = event.name.to_string().map_err(|_| {
+ anyhow::anyhow!("Failed to convert model name to string")
+ })?;
Committable suggestion skipped: line range outside the PR's diff.
fn validate(&self, _event: &Event) -> bool { | ||
true | ||
} | ||
|
||
fn task_priority(&self) -> TaskPriority { | ||
2 | ||
} | ||
|
||
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against out‑of‑bounds panics in task_identifier
task_identifier
blindly indexes event.keys[1]
and event.keys[2]
, yet validate
unconditionally returns true
. A malformed or truncated event would panic the whole indexer before you even hit process
. Let’s harden validation:
- fn validate(&self, _event: &Event) -> bool {
- true
+ fn validate(&self, event: &Event) -> bool {
+ // keys: [hash(StoreDelRecord), selector, entity_id]
+ event.keys.len() >= 3 && event.data.is_empty()
}
If stronger guarantees are ever relaxed upstream, this small check keeps the node alive.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn validate(&self, _event: &Event) -> bool { | |
true | |
} | |
fn task_priority(&self) -> TaskPriority { | |
2 | |
} | |
fn task_identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} | |
fn validate(&self, event: &Event) -> bool { | |
// keys: [hash(StoreDelRecord), selector, entity_id] | |
event.keys.len() >= 3 && event.data.is_empty() | |
} | |
fn task_priority(&self) -> TaskPriority { | |
2 | |
} | |
fn task_identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
event.keys[1].hash(&mut hasher); | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} |
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { | ||
panic!( | ||
"Expected {} event to be well formed.", | ||
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self) | ||
) | ||
}) { | ||
WorldEvent::StoreDelRecord(e) => e, | ||
_ => { | ||
unreachable!() | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Replace panic!
with graceful error propagation
A malformed event will currently panic!
and bring down the runtime. Bubble the error instead so higher‑level supervisors can decide whether to skip or retry.
- let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
- panic!(
- "Expected {} event to be well formed.",
- <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
- )
- }) {
+ let event = match WorldEvent::try_from(event).map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to decode {} event: {e}",
+ <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
+ )
+ })? {
This preserves observability while keeping the processor fault‑tolerant.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { | |
panic!( | |
"Expected {} event to be well formed.", | |
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self) | |
) | |
}) { | |
WorldEvent::StoreDelRecord(e) => e, | |
_ => { | |
unreachable!() | |
} | |
}; | |
let event = match WorldEvent::try_from(event).map_err(|e| { | |
anyhow::anyhow!( | |
"Failed to decode {} event: {e}", | |
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self) | |
) | |
})? { | |
WorldEvent::StoreDelRecord(e) => e, | |
_ => { | |
unreachable!() | |
} | |
}; |
fn validate(&self, _event: &Event) -> bool { | ||
true | ||
} | ||
|
||
fn task_priority(&self) -> TaskPriority { | ||
1 | ||
} | ||
|
||
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { | ||
panic!("Expected EventEmitted keys to be well formed: {:?}", e); | ||
}); | ||
// selector | ||
event.keys[1].hash(&mut hasher); | ||
// entity id | ||
let entity_id = poseidon_hash_many(&keys); | ||
entity_id.hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei — make validate
reflect reality to avoid panics
task_identifier
deserialises event.data
and will panic!
if the layout is wrong, yet validate
always returns true
. Harden it:
- fn validate(&self, _event: &Event) -> bool {
- true
+ fn validate(&self, event: &Event) -> bool {
+ // Expect at least one felt to decode the keys vector length.
+ !event.data.is_empty()
}
Alternatively, keep validate
permissive but handle the error inside task_identifier
without panicking.
This small guard saves the whole processor from crashing on malicious / corrupt data.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn validate(&self, _event: &Event) -> bool { | |
true | |
} | |
fn task_priority(&self) -> TaskPriority { | |
1 | |
} | |
fn task_identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| { | |
panic!("Expected EventEmitted keys to be well formed: {:?}", e); | |
}); | |
// selector | |
event.keys[1].hash(&mut hasher); | |
// entity id | |
let entity_id = poseidon_hash_many(&keys); | |
entity_id.hash(&mut hasher); | |
hasher.finish() | |
} | |
fn validate(&self, event: &Event) -> bool { | |
// Expect at least one felt to decode the keys vector length. | |
!event.data.is_empty() | |
} |
async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) { | ||
match metadata(uri_str.clone()).await { | ||
Ok((metadata, icon_img, cover_img)) => { | ||
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); | ||
info!( | ||
target: LOG_TARGET, | ||
resource = %format!("{:#x}", resource), | ||
"Updated resource metadata from ipfs." | ||
); | ||
} | ||
Err(e) => { | ||
error!( | ||
target: LOG_TARGET, | ||
resource = %format!("{:#x}", resource), | ||
uri = %uri_str, | ||
error = %e, | ||
"Retrieving resource uri." | ||
); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple unwrap()
calls inside background task
update_metadata(...).unwrap()
and several ?
operators will panic within the spawned task, bypassing the Result
handling of the processor. Wrap in if let Err(e) = … { error!(…) }
to keep the indexer alive.
let uri_str = event.uri.to_string().unwrap(); | ||
info!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unwrap()
here can bubble a panic from malformed URIs
A bad URI emitted by a buggy contract (or corrupted node) will crash the processor. Return an error instead:
-let uri_str = event.uri.to_string().unwrap();
+let Some(uri_str) = event.uri.to_string() else {
+ return Err(Error::msg("Invalid URI in MetadataUpdate event"));
+};
Committable suggestion skipped: line range outside the PR's diff.
let to_offset = base_offset; | ||
let selector_offset = to_offset + 1; | ||
let calldata_offset = selector_offset + 2; | ||
let calldata_len: usize = calldata[selector_offset + 1].try_into().unwrap(); | ||
let contract_address = calldata[to_offset]; | ||
|
||
let contract_class = | ||
contract_class_cache.get(contract_address, BlockId::Tag(BlockTag::Pending)).await?; | ||
|
||
let entrypoint = get_entrypoint_name_from_class(&contract_class, calldata[selector_offset]) | ||
.unwrap_or(format!("{:#x}", calldata[selector_offset])); | ||
|
||
// Calculate next offset: current offset + contract_address + selector + calldata_len + | ||
// calldata | ||
let next_offset = calldata_offset + calldata_len; | ||
|
||
Ok(( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
parse_outside_call
assumes perfect calldata – can panic
Several unchecked index reads (calldata[selector_offset + 1]
, etc.). Add length guards to avoid panics on malformed input.
async fn parse_legacy_execute_call<P: Provider + Send + Sync + std::fmt::Debug>( | ||
contract_class_cache: &ContractClassCache<P>, | ||
call: &LegacyExecuteCall, | ||
full_calldata: &[Felt], | ||
caller_address: Felt, | ||
call_type: CallType, | ||
) -> Result<ParsedCall> { | ||
let contract_class = contract_class_cache | ||
.get(call.contract_address, BlockId::Tag(BlockTag::Pending)) | ||
.await?; | ||
|
||
let entrypoint = get_entrypoint_name_from_class(&contract_class, call.selector) | ||
.unwrap_or(format!("{:#x}", call.selector)); | ||
|
||
Ok(ParsedCall { | ||
contract_address: call.contract_address, | ||
entrypoint, | ||
calldata: full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec(), | ||
call_type, | ||
caller_address, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bounds‑check before slicing legacy calldata
full_calldata[off..off+len]
will panic if the values are wrong. Safeguard it, sensei:
-let data = full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec();
+if call.data_offset + call.data_length > full_calldata.len() {
+ return Err(anyhow::anyhow!("Legacy calldata slice out of bounds"));
+}
+let data = full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec();
Committable suggestion skipped: line range outside the PR's diff.
#[async_trait] | ||
pub trait EventProcessor<P>: Send + Sync | ||
where | ||
P: Provider + Sync, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei – Add Send
bound to provider generic to satisfy async_trait
default
#[async_trait]
produces a Send
future by default.
Because P
is awaited across .process()
, the compiler will complain if P
is not Send
.
Either add the bound to all processor traits or switch to #[async_trait(?Send)]
(less common).
-where
- P: Provider + Sync,
+where
+ P: Provider + Send + Sync,
Apply the same change to BlockProcessor
and TransactionProcessor
for consistency.
Failing to do so will cause a compile‑time error once a non‑Send
provider is used.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
#[async_trait] | |
pub trait EventProcessor<P>: Send + Sync | |
where | |
P: Provider + Sync, | |
{ | |
#[async_trait] | |
pub trait EventProcessor<P>: Send + Sync | |
where | |
P: Provider + Send + Sync, | |
{ |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3172 +/- ##
==========================================
- Coverage 55.63% 55.61% -0.02%
==========================================
Files 443 443
Lines 62987 62987
==========================================
- Hits 35041 35029 -12
- Misses 27946 27958 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
crates/torii/digraphmap/src/lib.rs (4)
51-53
: Pass the key once – the secondDebug
format is unnecessaryHere we already know the variant’s Display will format the string; feeding it
a string that has gone throughformat!("{:?}", key)
adds the superfluous
quotes discussed earlier. Passingkey.to_string()
(or keepingDebug
in the
error and changing the Display string as shown in error.rs) avoids the cosmetic
issue.-return Err(DigraphMapError::DuplicateKey(format!("{:?}", key))); +return Err(DigraphMapError::DuplicateKey(key.to_string()));
87-92
: Same quoting concern forNodeNotFound
-.ok_or_else(|| DigraphMapError::NodeNotFound(format!("{:?}", key))) +.ok_or_else(|| DigraphMapError::NodeNotFound(key.to_string()))Keeping the key un‑quoted yields cleaner error text.
If you adopt the{0}
change inerror.rs
, this line can remain as‑is with
format!("{:?}", …)
removed.
105-118
:topo_sort
is currently O(n²); maintain a reverse map for O(n)For every node index you iterate over the entire
node_indices
map to locate
its key. In large graphs this becomes noticeably quadratic.A lightweight fix is to store a second
HashMap<NodeIndex, K>
when you insert
nodes. You already pay the cost of a single extra key clone at insertion time,
but topological sorts become strictly linear.@@ struct DigraphMap - /// Map from node keys to their indices in the graph + /// Map from node keys to their indices in the graph node_indices: HashMap<K, NodeIndex>, + /// Reverse map for fast lookup during topo‑sort + index_to_key: HashMap<NodeIndex, K>,and on insertion:
self.node_indices.insert(key.clone(), node_idx); self.index_to_key.insert(node_idx, key);then inside
topo_sort
:if let Some(key) = self.index_to_key.get(&idx) { result.push((key.clone(), node_value)); }This leaves algorithmic complexity at O(n + e).
160-210
: Great test coverage – consider an explicit cycle topo‑sort testYou already assert that adding a back‑edge errors. To exercise the warning path
intopo_sort
, add a test that builds a graph with a deliberate cycle, calls
topo_sort
, and ensures you still get all nodes back (ordering arbitrary).
This protects the fallback branch from future regressions.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
crates/torii/digraphmap/Cargo.toml
(1 hunks)crates/torii/digraphmap/src/error.rs
(1 hunks)crates/torii/digraphmap/src/lib.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- crates/torii/digraphmap/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
🔇 Additional comments (1)
crates/torii/digraphmap/src/lib.rs (1)
32-38
: Ohayo sensei! Nice, idiomatic constructor 👌The
new()
implementation is clean and straight to the point. Nothing to tweak
here.
crates/torii/digraphmap/src/error.rs
Outdated
#[error("Node with key {0:?} not found")] | ||
NodeNotFound(String), | ||
#[error("Adding edge would create a cycle")] | ||
CycleDetected, | ||
#[error("Duplicate node key: {0:?}")] | ||
DuplicateKey(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid double‑quoting in Display messages by using {0}
instead of {0:?}
thiserror
will feed the inner String
into the format string.
Because the code currently does
DigraphMapError::NodeNotFound(format!("{:?}", key))
and the Display line again prints {0:?}
, a key like "a"
ends up rendered as
Node with key "\"a\"" not found
, i.e. with twice the quotes.
Switching to {0}
keeps the human‑readable value while avoiding the extra
escaping.
-#[error("Node with key {0:?} not found")]
+#[error("Node with key {0} not found")]
…
-#[error("Duplicate node key: {0:?}")]
+#[error("Duplicate node key: {0}")]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
crates/torii/adigraphmap/src/lib.rs (1)
96-99
: Cycle test useshas_path_connecting
– consider early exit optimisation.
petgraph::algo::has_path_connecting
performs a full walk each time a new edge is considered.
For dense graphs this can dominate build time. A lightweight alternative is to maintain aDag
and rely on the returnedErr
fromadd_edge
, or keep ancestor sets per node.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
crates/torii/adigraphmap/Cargo.toml
(1 hunks)crates/torii/adigraphmap/src/error.rs
(1 hunks)crates/torii/adigraphmap/src/lib.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- crates/torii/adigraphmap/Cargo.toml
- crates/torii/adigraphmap/src/error.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
🔇 Additional comments (1)
crates/torii/adigraphmap/src/lib.rs (1)
42-46
: Dependency insertion requires pre‑existing nodes – is that intentional, sensei?
add_node_with_dependencies()
first inserts the current node, then callsadd_dependency()
which fails withNodeNotFound
if any dependency hasn’t been added yet.
That forces callers to create all dependency nodes beforehand, disallowing a more declarative “add everything in any order” style.If the stricter contract was not intended, you could lazily insert missing dependency nodes with a placeholder value (or return a richer error that lists the missing keys).
Would you like this crate to support out‑of‑order insertion? Let me know and I can sketch the patch.
// Collect nodes in topological order | ||
for idx in indices { | ||
let node_value = self.graph[idx].clone(); | ||
if let Some(key) = | ||
self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone()) | ||
{ | ||
result.push((key, node_value)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Topological helpers perform an O(N²) key‑lookup – keep a reverse map for linear time.
topo_sort()
and topo_sort_by_level()
reconstruct the (key, value)
pairs by scanning self.node_indices
for every single vertex (iter().find(|&(_, &v)| v == idx)
).
For a graph with N
vertices, that turns the overall complexity of these helpers into O(N²) and triggers a lot of allocator churn because of repeated clone
s.
A tiny reverse map maintained on insertion avoids this:
@@ struct AcyclicDigraphMap<K, V>
- node_indices: HashMap<K, NodeIndex>,
+ node_indices: HashMap<K, NodeIndex>,
+ index_keys: HashMap<NodeIndex, K>, // <— new
@@ fn add_node(&mut self, key: K, value: V)
- self.node_indices.insert(key, node_idx);
+ self.index_keys.insert(node_idx, key.clone());
+ self.node_indices.insert(key, node_idx);
@@ fn topo_sort(&self)
- if let Some(key) =
- self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
- {
- result.push((key, node_value));
- }
+ if let Some(key) = self.index_keys.get(&idx) {
+ result.push((key.clone(), node_value));
+ }
The same one‑liner swap (self.index_keys.get(&node)
) removes the quadratic scan inside topo_sort_by_level()
as well.
Also applies to: 150-156
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
crates/torii/task-network/src/lib.rs (2)
32-38
: Guard against zero‑concurrency deadlock
Semaphore::new(0)
causes every worker to wait forever. Rejectmax_concurrent_tasks == 0
during construction (and maybe upper‑bound it tousize::MAX / 2
to avoid overflow surprises).pub fn new(max_concurrent_tasks: usize) -> Self { - Self { - tasks: AcyclicDigraphMap::new(), - max_concurrent_tasks, - } + assert!(max_concurrent_tasks > 0, "max_concurrent_tasks must be > 0"); + Self { + tasks: AcyclicDigraphMap::new(), + max_concurrent_tasks, + } }
122-126
: Consider clearing the graph even on early errorIf
process_tasks
returns early due to a join or task error, the internal graph remains populated, so a subsequent call will rerun failed tasks unless the caller remembers to callclear()
manually. Dropping the tasks in afinally
‑style block (tokio::select!
ormatch
) guarantees a clean state for the next run.This is a minor maintenance safeguard; feel free to keep the current semantics if callers always rebuild a fresh
TaskNetwork
.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (3)
crates/torii/task-network/Cargo.toml
(1 hunks)crates/torii/task-network/src/error.rs
(1 hunks)crates/torii/task-network/src/lib.rs
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- crates/torii/task-network/src/error.rs
- crates/torii/task-network/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
crates/torii/task-network/src/lib.rs
Outdated
handles.push(tokio::spawn(async move { | ||
let _permit = semaphore.acquire().await.map_err(|e| TaskNetworkError::SemaphoreError(e))?; | ||
|
||
debug!( | ||
target: LOG_TARGET, | ||
task_id = %task_id, | ||
level = level_idx, | ||
"Processing task." | ||
); | ||
|
||
match task_handler(task_id, task_clone).await { | ||
Ok(_) => Ok(()), | ||
Err(e) => { | ||
error!( | ||
target: LOG_TARGET, | ||
error = %e, | ||
task_id = %task_id, | ||
"Error processing task." | ||
); | ||
Err(e) | ||
} | ||
} | ||
})); | ||
} | ||
|
||
// Wait for all tasks in this level to complete before proceeding to the next level | ||
try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei – task‑level failures are silently swallowed
try_join_all
only fails fast on a JoinHandle panic/cancel.
Because every JoinHandle
resolves to Ok(Result<(), anyhow::Error>)
, an error returned by your task_handler
is wrapped inside the Ok variant and therefore never bubbles up. process_tasks
will happily return Ok(())
, masking real processing errors.
- try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?;
+ // Wait for all handles and propagate the first task‑handler error (if any)
+ let task_results = try_join_all(handles)
+ .await
+ .map_err(TaskNetworkError::JoinError)?;
+
+ if let Some(err) = task_results.into_iter().find_map(|inner| inner.err()) {
+ return Err(TaskNetworkError::TaskHandlerError(err));
+ }
Make sure TaskNetworkError
has a TaskHandlerError(anyhow::Error)
variant (or equivalent) so callers can react appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
♻️ Duplicate comments (13)
crates/torii/adigraphmap/src/lib.rs (2)
108-116
: Ohayo sensei! Topological helpers perform an O(N²) key-lookup – keep a reverse map for linear time.The
topo_sort()
method uses an inefficient linear search to find keys by node index, which results in O(N²) complexity for the entire topological sort. This is particularly costly for large graphs.Consider adding a reverse mapping from node indices to keys:
pub struct AcyclicDigraphMap<K, V> where K: Eq + Hash + Clone + std::fmt::Debug, V: Clone, { /// The underlying directed graph graph: DiGraph<V, ()>, /// Map from node keys to their indices in the graph node_indices: HashMap<K, NodeIndex>, + /// Map from node indices to their keys for efficient reverse lookup + index_keys: HashMap<NodeIndex, K>, }Then update
add_node()
andtopo_sort()
:pub fn add_node(&mut self, key: K, value: V) -> Result<NodeIndex> { if self.node_indices.contains_key(&key) { return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key))); } let node_idx = self.graph.add_node(value); self.node_indices.insert(key.clone(), node_idx); + self.index_keys.insert(node_idx, key); Ok(node_idx) } // In topo_sort() -if let Some(key) = - self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone()) -{ - result.push((key, node_value)); -} +if let Some(key) = self.index_keys.get(&idx) { + result.push((key.clone(), node_value)); +}
150-156
: Ohayo sensei! Same O(N²) issue in topo_sort_by_level method.The same linear search inefficiency exists in the
topo_sort_by_level()
method, which could significantly impact performance.With the index_keys map suggested earlier, you can replace this lookup:
-if let Some(key) = - self.node_indices.iter().find(|&(_, &v)| v == node).map(|(k, _)| k.clone()) -{ - let value = self.graph[node].clone(); - level_nodes.push((key, value)); -} +if let Some(key) = self.index_keys.get(&node) { + let value = self.graph[node].clone(); + level_nodes.push((key.clone(), value)); +}crates/torii/processors/src/processors/store_update_member.rs (2)
70-76
: Ohayo sensei! Replace the panic with a recoverable error
try_from(event).unwrap_or_else(|_| panic!(...))
will abort the whole processor task for a single bad event. Prefer bubbling the error so the task manager can decide whether to skip, retry, or mark the event failed.-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { - panic!( - "Expected {} event to be well formed.", - <StoreUpdateMemberProcessor as EventProcessor<P>>::event_key(self) - ) -}) { +let event = match WorldEvent::try_from(event).map_err(|e| { + anyhow::anyhow!( + "Invalid {} event format: {e}", + <StoreUpdateMemberProcessor as EventProcessor<P>>::event_key(self) + ) +})? {
89-106
: Ohayo sensei! Avoid string‑matching on error messages
Err(e) if e.to_string().contains("no rows")
is brittle and locale‑dependent. If the SQL layer exposes a specific error kind/enum (e.g.sqlx::Error::RowNotFound
), match on that instead.-Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { +Err(sqlx::Error::RowNotFound) if !config.namespaces.is_empty() => {If such an enum isn't available, consider adding a helper in
torii_sqlite
that returns a typed error you can match on.crates/torii/processors/src/processors/store_del_record.rs (2)
52-54
: Ohayo sensei! Guard against out‑of‑bounds panics invalidate
validate
unconditionally returnstrue
, yet the processor accesses specific indices ofevent.keys
. A malformed or truncated event would panic the whole indexer. Let's harden validation:-fn validate(&self, _event: &Event) -> bool { - true +fn validate(&self, event: &Event) -> bool { + // keys: [hash(StoreDelRecord), selector, entity_id] + event.keys.len() >= 3 && event.data.is_empty() }
79-89
: Ohayo sensei! Replacepanic!
with graceful error propagationA malformed event will currently
panic!
and bring down the runtime. Bubble the error instead so higher‑level supervisors can decide whether to skip or retry.-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { - panic!( - "Expected {} event to be well formed.", - <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self) - ) -}) { +let event = match WorldEvent::try_from(event).map_err(|e| { + anyhow::anyhow!( + "Failed to decode {} event: {e}", + <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self) + ) +})? {crates/torii/processors/src/processors/store_set_record.rs (2)
80-97
: Ohayo sensei! Avoid string‑based SQL error matching.Matching
e.to_string().contains("no rows")
is brittle and locale‑dependent. If the SQL layer exposes a specific error kind/enum, match on that instead.
66-76
: 🛠️ Refactor suggestionOhayo sensei! Replace panic with graceful error propagation.
The current implementation panics on deserialization failure. This is not resilient and could bring down the entire processing pipeline.
-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| { - panic!( - "Expected {} event to be well formed.", - <StoreSetRecordProcessor as EventProcessor<P>>::event_key(self) - ) -}) { - WorldEvent::StoreSetRecord(e) => e, - _ => { - unreachable!() - } +let event = match WorldEvent::try_from(event) { + Ok(WorldEvent::StoreSetRecord(e)) => e, + Ok(_) => { + return Err(anyhow::anyhow!( + "Received a non-StoreSetRecord event under key {}", + self.event_key() + )); + } + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to decode {} event: {}", + self.event_key(), + e + )); + } }crates/torii/processors/src/processors/mod.rs (2)
51-52
: Ohayo sensei! Fatal type‑mismatch inEventProcessorMap
definition
EventProcessorMap
is declared asHashMap<EventKey, Box<…>>
, yetget_event_processors
(and the design intent of allowing multiple processors per key) expects aHashMap<…, Vec<Box<…>>>
.This discrepancy will prevent processors with the same event key (like the two
"Transfer"
processors for ERC‑20/721) from both being registered, as they'll silently overwrite each other.type EventKey = String; -type EventProcessorMap<P> = HashMap<EventKey, Box<dyn EventProcessor<P>>>; +type EventProcessorMap<P> = HashMap<EventKey, Vec<Box<dyn EventProcessor<P>>>>;
123-127
: Ohayo sensei! Processors are being overwritten instead of accumulatedBecause
.insert(key, processor)
is used, only the last processor for a given event‑key survives.
After applying the previous fix to makeEventProcessorMap
use a Vec, accumulate processors withentry(key).or_default().push(...)
:-let key = processor.event_key(); -event_processors_map.entry(contract_type).or_default().insert(key, processor); +let key = processor.event_key(); +event_processors_map + .entry(contract_type) + .or_default() + .entry(key) + .or_default() + .push(processor);crates/torii/processors/src/lib.rs (3)
30-34
: Ohayo sensei – AddSend
bound to provider generic to satisfyasync_trait
defaultThe
async_trait
macro produces aSend
future by default. Because the providerP
is awaited within the.process()
implementation, the compiler will requireP
to beSend
.-where - P: Provider + Sync, +where + P: Provider + Send + Sync,This is a critical fix to ensure your code compiles when a provider is used in an async context.
79-81
:⚠️ Potential issueOhayo sensei – Add
Send
bound to provider generic in this trait as wellFor consistency with the other processor traits and to ensure proper async functionality, add the
Send
bound to the provider generic inTransactionProcessor
too.-pub trait TransactionProcessor<P: Provider + Sync + std::fmt::Debug>: Send + Sync { +pub trait TransactionProcessor<P: Provider + Send + Sync + std::fmt::Debug>: Send + Sync {
67-69
:⚠️ Potential issueOhayo sensei – Add
Send
bound to provider generic here tooSimilar to the issue with
EventProcessor
, theBlockProcessor
trait should also include theSend
bound on providerP
for consistency and to avoid compiler errors with async traits.-pub trait BlockProcessor<P: Provider + Sync>: Send + Sync { +pub trait BlockProcessor<P: Provider + Send + Sync>: Send + Sync {
🧹 Nitpick comments (7)
crates/torii/adigraphmap/src/lib.rs (4)
189-192
: Ohayo sensei! Clear method should update index_keys too.If you implement the suggested
index_keys
map, don't forget to clear it here too.pub fn clear(&mut self) { self.graph.clear(); self.node_indices.clear(); + self.index_keys.clear(); }
121-176
: Ohayo sensei! topo_sort_by_level would benefit from algorithm documentation.The implementation of
topo_sort_by_level
is complex and could benefit from more detailed documentation explaining the algorithm.Consider adding a more detailed doc comment, for example:
/// Get the nodes grouped by topological level, where each level contains nodes /// that can be processed in parallel (no dependencies among them). /// Returns a vector of vectors, where each inner vector represents a level. /// /// # Algorithm /// /// 1. Start with nodes that have no incoming edges (level 0) /// 2. For each level: /// a. Process all nodes in the current level /// b. For each outgoing edge, decrement the incoming edge count of the target /// c. Any node whose incoming edge count becomes 0 is added to the next level /// 3. Continue until all nodes are processed /// /// This ensures that processing all nodes in a level makes the next level ready /// for processing, allowing for parallel execution within each level. pub fn topo_sort_by_level(&self) -> Vec<Vec<(K, V)>> { // ...existing implementation... }
205-297
: Ohayo sensei! Tests look comprehensive but could use additional cases.The current tests are good, covering the basic functionality, but they miss some edge cases.
Consider adding tests for:
- Empty graph behavior:
#[test] fn test_empty_graph() { let graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new(); assert!(graph.is_empty()); assert_eq!(graph.len(), 0); assert!(graph.topo_sort().is_empty()); assert!(graph.topo_sort_by_level().is_empty()); }
- More complex cycle detection:
#[test] fn test_complex_cycle_detection() { let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new(); // Create a graph: a -> b -> c -> d graph.add_node("a".to_string(), 1).unwrap(); graph.add_node("b".to_string(), 2).unwrap(); graph.add_node("c".to_string(), 3).unwrap(); graph.add_node("d".to_string(), 4).unwrap(); graph.add_dependency(&"a".to_string(), &"b".to_string()).unwrap(); graph.add_dependency(&"b".to_string(), &"c".to_string()).unwrap(); graph.add_dependency(&"c".to_string(), &"d".to_string()).unwrap(); // Attempting to add d -> a would create a cycle assert!(graph.add_dependency(&"d".to_string(), &"a".to_string()).is_err()); }
- Test for modifying values:
#[test] fn test_get_mut() { let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new(); graph.add_node("a".to_string(), 1).unwrap(); // Modify the value if let Some(val) = graph.get_mut(&"a".to_string()) { *val = 42; } // Verify the change assert_eq!(graph.get(&"a".to_string()), Some(&42)); }
33-33
: Ohayo sensei! Initialize with capacity when known in advance.When creating a new
AcyclicDigraphMap
, it's often possible to know the expected capacity in advance.Consider adding a constructor with capacity:
pub fn new() -> Self { Self { graph: DiGraph::new(), node_indices: HashMap::new() } } +/// Create a new empty DigraphMap with the specified capacity +pub fn with_capacity(capacity: usize) -> Self { + Self { + graph: DiGraph::with_capacity(capacity, capacity * 2), // Nodes and edges + node_indices: HashMap::with_capacity(capacity), + } +}This can significantly improve performance by reducing rehashing when adding many nodes at once.
crates/torii/processors/src/lib.rs (3)
18-22
: Ohayo sensei – Consider adding documentation to explain config fieldsThe
EventProcessorConfig
struct would benefit from documentation explaining the purpose of each field, especiallystrict_model_reader
whose function isn't immediately clear from the name.#[derive(Clone, Debug, Default)] +/// Configuration for event processors. pub struct EventProcessorConfig { + /// Set of namespaces to index. If empty, all namespaces are indexed. pub namespaces: HashSet<String>, + /// When true, enforces strict validation of models during reading operations. pub strict_model_reader: bool, }
38-40
: Ohayo sensei – Consider using iterators more idiomaticallyThe current implementation of
event_keys_as_string
could be written more idiomatically:fn event_keys_as_string(&self, event: &Event) -> String { - event.keys.iter().map(|i| format!("{:#064x}", i)).collect::<Vec<_>>().join(",") + event.keys.iter() + .map(|i| format!("{:#064x}", i)) + .collect::<Vec<_>>() + .join(",") }This improves readability without changing functionality.
46-50
: Ohayo sensei – Consider documenting hash-based task identification strategyThe task identifier implementation uses a deterministic hash of the event keys. This is a good approach but would benefit from a brief documentation comment explaining the strategy and why it's appropriate.
+/// Generates a unique task identifier by hashing the event keys. +/// This ensures the same event always produces the same task ID. fn task_identifier(&self, event: &Event) -> TaskId { let mut hasher = DefaultHasher::new(); event.keys.hash(&mut hasher); hasher.finish() }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (13)
crates/torii/adigraphmap/Cargo.toml
(1 hunks)crates/torii/adigraphmap/src/error.rs
(1 hunks)crates/torii/adigraphmap/src/lib.rs
(1 hunks)crates/torii/processors/Cargo.toml
(1 hunks)crates/torii/processors/src/lib.rs
(1 hunks)crates/torii/processors/src/processors/mod.rs
(1 hunks)crates/torii/processors/src/processors/store_del_record.rs
(1 hunks)crates/torii/processors/src/processors/store_set_record.rs
(1 hunks)crates/torii/processors/src/processors/store_transaction.rs
(1 hunks)crates/torii/processors/src/processors/store_update_member.rs
(1 hunks)crates/torii/processors/src/processors/store_update_record.rs
(1 hunks)crates/torii/processors/src/processors/upgrade_event.rs
(1 hunks)crates/torii/processors/src/processors/upgrade_model.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- crates/torii/processors/Cargo.toml
- crates/torii/adigraphmap/Cargo.toml
- crates/torii/processors/src/processors/upgrade_model.rs
- crates/torii/processors/src/processors/upgrade_event.rs
- crates/torii/processors/src/processors/store_transaction.rs
- crates/torii/processors/src/processors/store_update_record.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/torii/processors/src/processors/store_set_record.rs (6)
crates/torii/sqlite/sqlite/src/utils.rs (1)
felts_to_sql_string
(35-38)crates/torii/processors/src/processors/store_del_record.rs (4)
contract_type
(44-46)event_key
(48-50)validate
(52-54)process
(67-126)crates/torii/processors/src/processors/store_update_member.rs (4)
contract_type
(46-48)event_key
(50-52)validate
(54-56)process
(58-137)crates/torii/processors/src/processors/store_update_record.rs (4)
contract_type
(45-47)event_key
(49-51)validate
(53-55)process
(57-128)crates/torii/processors/src/lib.rs (7)
event
(39-39)contract_type
(35-35)event_key
(36-36)validate
(52-52)process
(55-64)process
(70-76)process
(81-91)crates/torii/processors/src/processors/upgrade_model.rs (4)
contract_type
(27-29)event_key
(31-33)validate
(37-39)process
(42-138)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
🔇 Additional comments (3)
crates/torii/adigraphmap/src/error.rs (1)
1-12
: Ohayo sensei! Error enum is well-defined and comprehensive.The error enum is well structured with clear error messages and appropriate derivation of the
Error
andDebug
traits. The three error cases (node not found, cycle detection, and duplicate key) cover all the necessary failure scenarios for an acyclic digraph implementation.crates/torii/processors/src/lib.rs (2)
42-44
: Ohayo sensei – Task dependencies default implementation looks goodThe default implementation returning an empty vector is a sensible default that can be overridden by specific processors that need to express dependencies.
54-65
: Ohayo sensei – The process method appropriately allows many argumentsThe
#[allow(clippy::too_many_arguments)]
attribute is justified here given the nature of the method. It needs substantial context to process events correctly. The parameters are well-organized and clearly named.
/// Add a node with dependencies to the graph | ||
pub fn add_node_with_dependencies( | ||
&mut self, | ||
key: K, | ||
value: V, | ||
dependencies: Vec<K>, | ||
) -> Result<NodeIndex> { | ||
let node_idx = self.add_node(key.clone(), value)?; | ||
for dependency in dependencies { | ||
self.add_dependency(&dependency, &key)?; | ||
} | ||
Ok(node_idx) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! add_node_with_dependencies should be atomic.
The method adds a node first, then adds dependencies one by one. If adding a dependency fails (e.g., due to a cycle), the node remains in the graph without all its dependencies, leaving the graph in a potentially inconsistent state.
Consider making the operation atomic by validating all dependencies first:
pub fn add_node_with_dependencies(
&mut self,
key: K,
value: V,
dependencies: Vec<K>,
) -> Result<NodeIndex> {
+ // Validate all dependencies exist first
+ for dependency in &dependencies {
+ if !self.node_indices.contains_key(dependency) {
+ return Err(AcyclicDigraphMapError::NodeNotFound(format!("{:?}", dependency)));
+ }
+ }
+
+ // Check for cycles before adding anything
+ if self.node_indices.contains_key(&key) {
+ return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key)));
+ }
+
+ // Temporarily add the node to check for cycles
+ let node_idx = self.graph.add_node(value.clone());
+
+ // Check if adding any dependency would create a cycle
+ for dependency in &dependencies {
+ let from_idx = self.node_indices.get(dependency).unwrap().clone();
+ if petgraph::algo::has_path_connecting(&self.graph, node_idx, from_idx, None) {
+ // Remove the temporary node if we'd create a cycle
+ self.graph.remove_node(node_idx);
+ return Err(AcyclicDigraphMapError::CycleDetected);
+ }
+ }
+
+ // Remove the temporary node if checks passed
+ self.graph.remove_node(node_idx);
+
let node_idx = self.add_node(key.clone(), value)?;
for dependency in dependencies {
self.add_dependency(&dependency, &key)?;
}
Ok(node_idx)
}
This solution is more complex but ensures the graph remains consistent. Alternatively, you could implement a transaction-like pattern that rolls back changes on error.
fn validate(&self, _event: &Event) -> bool { | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Improve event validation to prevent processing invalid events.
The current implementation of validate
simply returns true
, which means all events tagged as "StoreUpdateMember" will be processed regardless of their structure. Consider adding validation to check if the event contains the expected keys and data structure.
fn validate(&self, event: &Event) -> bool {
- true
+ // Keys should be [hash(StoreUpdateMember), model_selector, entity_id, member_selector]
+ event.keys.len() >= 4 && !event.data.is_empty()
}
Committable suggestion skipped: line range outside the PR's diff.
let mut values = event.values.to_vec(); | ||
member.ty.deserialize(&mut values)?; | ||
|
||
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Add validation for sufficient values in the event data.
The code deserializes event.values
without checking if it contains enough data for the member type, which could lead to runtime errors or incorrect data if the event is malformed.
let mut values = event.values.to_vec();
+// Validate that we have enough values for deserialization
+if values.is_empty() {
+ return Err(anyhow::anyhow!("Empty values array in StoreUpdateMember event"));
+}
member.ty.deserialize(&mut values)?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut values = event.values.to_vec(); | |
member.ty.deserialize(&mut values)?; | |
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] }); | |
let mut values = event.values.to_vec(); | |
// Validate that we have enough values for deserialization | |
if values.is_empty() { | |
return Err(anyhow::anyhow!("Empty values array in StoreUpdateMember event")); | |
} | |
member.ty.deserialize(&mut values)?; | |
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] }); |
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor { | ||
fn dependencies(&self, event: &Event) -> Vec<TaskId> { | ||
let mut hasher = DefaultHasher::new(); | ||
// model selector | ||
event.keys[1].hash(&mut hasher); | ||
vec![hasher.finish()] | ||
} | ||
|
||
fn identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
// model selector | ||
event.keys[1].hash(&mut hasher); | ||
// entity id | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Protect against potential IndexOutOfBounds errors in TaskProcessor implementation.
The methods dependencies
and identifier
access event keys by index without checking if these indices are valid, which could cause a panic if the event structure is unexpected.
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
- event.keys[1].hash(&mut hasher);
+ if event.keys.len() > 1 {
+ event.keys[1].hash(&mut hasher);
+ }
vec![hasher.finish()]
}
fn identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// model selector
- event.keys[1].hash(&mut hasher);
- // entity id
- event.keys[2].hash(&mut hasher);
+ if event.keys.len() > 1 {
+ event.keys[1].hash(&mut hasher);
+ // entity id
+ if event.keys.len() > 2 {
+ event.keys[2].hash(&mut hasher);
+ }
+ }
hasher.finish()
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor { | |
fn dependencies(&self, event: &Event) -> Vec<TaskId> { | |
let mut hasher = DefaultHasher::new(); | |
// model selector | |
event.keys[1].hash(&mut hasher); | |
vec![hasher.finish()] | |
} | |
fn identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
// model selector | |
event.keys[1].hash(&mut hasher); | |
// entity id | |
event.keys[2].hash(&mut hasher); | |
hasher.finish() | |
} | |
} | |
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor { | |
fn dependencies(&self, event: &Event) -> Vec<TaskId> { | |
let mut hasher = DefaultHasher::new(); | |
// model selector | |
if event.keys.len() > 1 { | |
event.keys[1].hash(&mut hasher); | |
} | |
vec![hasher.finish()] | |
} | |
fn identifier(&self, event: &Event) -> TaskId { | |
let mut hasher = DefaultHasher::new(); | |
// model selector | |
if event.keys.len() > 1 { | |
event.keys[1].hash(&mut hasher); | |
// entity id | |
if event.keys.len() > 2 { | |
event.keys[2].hash(&mut hasher); | |
} | |
} | |
hasher.finish() | |
} | |
} |
let model = match db.model(event.selector).await { | ||
Ok(m) => m, | ||
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => { | ||
debug!( | ||
target: LOG_TARGET, | ||
selector = %event.selector, | ||
"Model does not exist, skipping." | ||
); | ||
return Ok(()); | ||
} | ||
Err(e) => { | ||
return Err(anyhow::anyhow!( | ||
"Failed to retrieve model with selector {:#x}: {}", | ||
event.selector, | ||
e | ||
)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Refactor error handling to use typed errors instead of string matching.
String matching on error messages is brittle and could break if the underlying library changes its error messages.
-Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
+Err(e) if is_row_not_found_error(&e) && !config.namespaces.is_empty() => {
Add a helper function to torii_sqlite
or locally:
fn is_row_not_found_error(e: &dyn std::error::Error) -> bool {
// Check for specific error types from your SQL library
// Example for sqlx:
// e.downcast_ref::<sqlx::Error>().map_or(false, |e| matches!(e, sqlx::Error::RowNotFound))
e.to_string().contains("no rows") // Temporary fallback
}
impl TaskProcessor for StoreDelRecordProcessor { | ||
fn dependencies(&self, event: &Event) -> Vec<TaskId> { | ||
let mut hasher = DefaultHasher::new(); | ||
// model selector | ||
event.keys[0].hash(&mut hasher); | ||
|
||
vec![hasher.finish()] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Fix inconsistent key indexing in dependencies method.
The dependencies
method uses event.keys[0]
while other processors and the identifier
method in this same processor use event.keys[1]
for the model selector. This inconsistency could lead to incorrect dependency tracking.
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
- event.keys[0].hash(&mut hasher);
+ event.keys[1].hash(&mut hasher);
vec![hasher.finish()]
}
fn task_priority(&self) -> TaskPriority { | ||
2 | ||
} | ||
|
||
fn task_identifier(&self, event: &Event) -> TaskId { | ||
let mut hasher = DefaultHasher::new(); | ||
event.keys[1].hash(&mut hasher); | ||
event.keys[2].hash(&mut hasher); | ||
hasher.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Remove redundant task_identifier method.
The task_identifier
method duplicates the functionality already implemented in the identifier
method from the TaskProcessor
trait. This duplication could lead to maintenance issues if they get out of sync.
-fn task_priority(&self) -> TaskPriority {
- 2
-}
-
-fn task_identifier(&self, event: &Event) -> TaskId {
- let mut hasher = DefaultHasher::new();
- event.keys[1].hash(&mut hasher);
- event.keys[2].hash(&mut hasher);
- hasher.finish()
-}
+fn task_priority(&self) -> TaskPriority {
+ 2
+}
fn validate(&self, _event: &Event) -> bool { | ||
true | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Add proper event validation.
The current validation always returns true
, which means all events will be processed regardless of their structure. This could lead to runtime errors or panics when accessing event keys or data.
-fn validate(&self, _event: &Event) -> bool {
- true
+fn validate(&self, event: &Event) -> bool {
+ // Ensure we have the expected key structure
+ // keys: [hash(StoreSetRecord), selector, entity_id, ...]
+ event.keys.len() >= 3 && !event.values.is_empty()
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
fn validate(&self, _event: &Event) -> bool { | |
true | |
} | |
fn validate(&self, event: &Event) -> bool { | |
// Ensure we have the expected key structure | |
// keys: [hash(StoreSetRecord), selector, entity_id, ...] | |
event.keys.len() >= 3 && !event.values.is_empty() | |
} |
let mut keys_and_unpacked = [event.keys, event.values].concat(); | ||
|
||
let mut entity = model.schema; | ||
entity.deserialize(&mut keys_and_unpacked)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Add validation before array concatenation and deserialization.
The code concatenates keys and values arrays and then deserializes into the entity without validating the structure or lengths. This could lead to incorrect data or runtime errors.
-let mut keys_and_unpacked = [event.keys, event.values].concat();
+// Validate the expected structure before concatenation
+if event.keys.len() < 3 {
+ return Err(anyhow::anyhow!("Insufficient keys in StoreSetRecord event"));
+}
+
+let mut keys_and_unpacked = [event.keys, event.values].concat();
-entity.deserialize(&mut keys_and_unpacked)?;
+// Wrap deserialization in explicit error context
+entity.deserialize(&mut keys_and_unpacked).context("Failed to deserialize entity from event data")?;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut keys_and_unpacked = [event.keys, event.values].concat(); | |
let mut entity = model.schema; | |
entity.deserialize(&mut keys_and_unpacked)?; | |
// Validate the expected structure before concatenation | |
if event.keys.len() < 3 { | |
return Err(anyhow::anyhow!("Insufficient keys in StoreSetRecord event")); | |
} | |
let mut keys_and_unpacked = [event.keys, event.values].concat(); | |
let mut entity = model.schema; | |
// Wrap deserialization in explicit error context | |
entity | |
.deserialize(&mut keys_and_unpacked) | |
.context("Failed to deserialize entity from event data")?; |
pub fn get_event_processors( | ||
&self, | ||
contract_type: ContractType, | ||
) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> { | ||
self.event_processors.get(&contract_type).unwrap() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Add error handling for missing contract types.
The get_event_processors
method unwraps the result of get(&contract_type)
without checking if it exists. This would panic if called with an unsupported contract type.
pub fn get_event_processors(
&self,
contract_type: ContractType,
) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> {
- self.event_processors.get(&contract_type).unwrap()
+ self.event_processors.get(&contract_type).unwrap_or_else(|| {
+ // Either return an empty map or panic with a better error message
+ panic!("No processors registered for contract type {:?}", contract_type)
+ })
}
Better yet, return an Option or Result to allow callers to handle missing contract types gracefully:
pub fn get_event_processors(
&self,
contract_type: ContractType,
-) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> {
- self.event_processors.get(&contract_type).unwrap()
+) -> Option<&HashMap<EventKey, Box<dyn EventProcessor<P>>>> {
+ self.event_processors.get(&contract_type)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
crates/torii/task-network/src/lib.rs (1)
94-121
:⚠️ Potential issue
‼️ Task‑handler errors are still silently swallowed – propagate the innerErr
Ohayo sensei!
try_join_all
only fails fast on aJoinError
. Because eachJoinHandle
wraps the actual task result inside anOk(_)
, an error returned by yourtask_handler
ends up asOk(Err(anyhow::Error))
. The currentmap_err(TaskNetworkError::JoinError)
therefore converts only panics/cancels, allowing real task failures to slip through andprocess_tasks
returnsOk(())
.The exact same issue was raised in the previous review, so tagging this as a duplicate.
A minimal change that preserves the level semantics:
- // Wait for all tasks in this level to complete before proceeding to the next level - try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?; + // Wait for all handles and surface the *first* task‑handler error (if any) + let task_results = try_join_all(handles) + .await + .map_err(TaskNetworkError::JoinError)?; + + if let Some(err) = task_results.into_iter().find_map(|inner| inner.err()) { + return Err(TaskNetworkError::TaskHandlerError(err)); + }You’ll need a new enum variant:
TaskHandlerError(anyhow::Error)in
TaskNetworkError
, plus aFrom<anyhow::Error>
impl if you’d like ergonomic?
usage upstream.Without this fix, upstream callers can never detect a failed task – scary for anything more critical than a test harness!
🧹 Nitpick comments (1)
crates/torii/task-network/src/lib.rs (1)
145-147
: Consider usingavailable_parallelism().unwrap_or(1)
instead of hard‑coding4
Falling back to
4
threads on single‑core CI boxes can lead to unnecessary context‑switching overhead. A safer default is1
, which scales naturally whenavailable_parallelism
succeeds.- Self::new(std::thread::available_parallelism().map_or(4, |p| p.get())) + Self::new(std::thread::available_parallelism().map_or(1, |p| p.get()))
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (1)
crates/torii/task-network/src/lib.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: build
Summary by CodeRabbit
New Features
Chores