Skip to content

feat: stream SyncState response#751

Closed
TomasArrachea wants to merge 58 commits intonextfrom
tomasarrachea-sync-component-with-streaming
Closed

feat: stream SyncState response#751
TomasArrachea wants to merge 58 commits intonextfrom
tomasarrachea-sync-component-with-streaming

Conversation

@TomasArrachea
Copy link
Collaborator

@TomasArrachea TomasArrachea commented Feb 20, 2025

This PR updates the client's sync command to adapt to the new Streaming response introduced in 0xMiden/node#685.

The client is no longer needed to execute the sync request multiple times until the chain tip is reached, it just needs to read the multiple updates from the response Stream.

tomyrd and others added 30 commits December 30, 2024 17:17
* feat: remove steps from state sync interface

* feat: add block relevance in `StateSyncUpdate`

* fix: web client build

* chore: update docs

* add failing test related to TODOs

* fix: add nullifiers for new untracked notes in each step

* use `Arc<RwLoc<NoteUpdates>>` in callbacks

* feat: reduce the number of callbacks

* remove `NoteUpdates` as callback parameter

* refactor callbacks

* update docs

* change callbacks and move state transitions

* fix: web store

* refator: use partial mmr with current block in sync
Copy link
Contributor

@tomyrd tomyrd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a full review, I'll wait for the merge/rebase so the changes are a bit more clear.

Comment on lines 155 to 161
while let Some(res) = stream
.message()
.await
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))?
{
self.sync_state_step(
res.try_into().map_err(ClientError::RpcError)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure about having this logic in the client side vs the rpc side. We even have to transform between the rpc and domain types here which is something usually done inside the rpc client.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the problem is that moving this logic to the rpc side would mean that the rpc_api.sync_state call will need to wait until the stream is closed and return all the collected sync responses. Those would be processed on the client side once the stream is closed. I'm not convinced if that is the best way to go, doesn't seem to go along with the streaming logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a good solution would be that the rpc side returns an iterator that lazily maps the response into a domain type, but not sure if that is possible

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented this wrapper as SyncStateStream on 9dcdcac

Comment on lines 169 to 176
// New nullfiers should be added for new untracked notes
nullifiers_tags.append(
&mut state_sync_update
.note_updates
.updated_input_notes()
.map(|note| get_nullifier_prefix(&note.nullifier()))
.collect::<Vec<_>>(),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove the declaration of the nullifier_tags at the start of this function and directly build the array here with the values from the note_updates

}
}
}
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove transactions if we don't use it:

Suggested change
let transaction_updates = TransactionUpdates::new(transactions, discarded_transactions);
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions);

@@ -322,17 +331,14 @@ impl StateSync {
&mut self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove the The transaction updates might include: part of the doc comment above as we don't update the transactions here anymore. I would move each bullet point to the section in the code where we make each update.

@tomyrd tomyrd mentioned this pull request Feb 24, 2025
5 tasks
@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from c2b31ab to e6dc2f1 Compare February 24, 2025 20:15
@TomasArrachea TomasArrachea changed the title WIP: stream SyncState response feat: stream SyncState response Feb 24, 2025
@TomasArrachea TomasArrachea force-pushed the tomasarrachea-sync-component-with-streaming branch from 97ee6db to cf5d48b Compare February 24, 2025 20:47
@TomasArrachea TomasArrachea marked this pull request as ready for review February 24, 2025 21:05
Copy link
Contributor

@tomyrd tomyrd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some minor comments

Comment on lines 151 to 154
let unspent_nullifiers = unspent_input_notes
.iter()
.map(InputNoteRecord::nullifier)
.chain(unspent_output_notes.iter().filter_map(OutputNoteRecord::nullifier));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't needed. The unspent nullifiers should already be included in the state_sync_update.note_updates. (This also means that we could remove the .clone() from these vecs)

Comment on lines 167 to 168
// * Transactions that were committed. Some of these might be tracked by the client and need
// to be marked as committed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the transaction update is note done here. This part only deals with discarded transactions. I would move this to line 214 where we add committed transactions.

note_updates.insert_updates(Some(public_note), None);
}

//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an empty comment here

Copy link
Collaborator

@igamigo igamigo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Leaving some comments for now

CHANGELOG.md Outdated

* Add check for empty pay to id notes (#714).
* [BREAKING] Refactored authentication out of the `Client` and added new separate authenticators (#718).
* Read Sync State response from stream (#751).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's write this in past tense

Comment on lines 142 to 173
// Check for new nullifiers for input notes that were updated
let nullifiers_tags: Vec<u16> = state_sync_update
.note_updates
.updated_input_notes()
.map(|note| note.nullifier().prefix())
.collect();

let new_nullifiers = self
.rpc_api
.check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
.await?;

// Process nullifiers and track the updates of local tracked transactions that were
// discarded because the notes that they were processing were nullified by an
// another transaction.
let mut discarded_transactions = vec![];

for (nullifier, block_num) in new_nullifiers {
let nullifier_update = NullifierUpdate { nullifier, block_num };

let discarded_transaction =
state_sync_update.note_updates.apply_nullifiers_state_transitions(
&nullifier_update,
state_sync_update.transaction_updates.committed_transactions(),
)?;

if let Some(transaction_id) = discarded_transaction {
discarded_transactions.push(transaction_id);
}
}
let transaction_updates = TransactionUpdates::new(vec![], discarded_transactions);
state_sync_update.transaction_updates.extend(transaction_updates);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could factor this out to its own fucntion (sometihng like sync_nullifiers())

Comment on lines 155 to 161
while let Some(res) = stream
.message()
.await
.map_err(|e| ClientError::RpcError(ConnectionError(e.message().to_string())))?
{
self.sync_state_step(
res.try_into().map_err(ClientError::RpcError)?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with both of you! I think a wrapper over the stream that converts the responses appropriatly could work out well

Base automatically changed from tomyrd-sync-component-alt to next March 28, 2025 19:53
@tomyrd
Copy link
Contributor

tomyrd commented Mar 31, 2025

closing as it's too outdated with the client and most of the changes here are already included in #817 . We can open a new PR if we want to add the stream sync and use this as an example.

@tomyrd tomyrd closed this Mar 31, 2025
@bobbinth bobbinth deleted the tomasarrachea-sync-component-with-streaming branch July 3, 2025 05:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants