chore(ntx-builder): DataStore & DB writers refactor #1689
chore(ntx-builder): DataStore & DB writers refactor #1689Mirko-von-Leipzig merged 4 commits intonextfrom
Conversation
crates/ntx-builder/src/builder.rs
Outdated
| /// Receiver for notifications from account actors (e.g., note failures). | ||
| notification_rx: mpsc::UnboundedReceiver<ActorNotification>, |
There was a problem hiding this comment.
I think this should be bounded - actors can wait to submit.
There was a problem hiding this comment.
Should we limit it to 1 message at a time?
There was a problem hiding this comment.
The actor needs to wait until the notification has been handled in order to proceed. So having it be a single one at a time makes sense.
So we might also want a return oneshot channel for the actor to know its been completed.
There was a problem hiding this comment.
hmmm in that case we should keep one receiver per actor in the coordinator, since oneshot is single producer, right?
There was a problem hiding this comment.
There are several patterns we can use here; but the one I'm proposing looks like this:
struct Coordinator {
rx_notes_failed: tokio::sync::mpsc::Receiver<
// Notes that failed.
HashSet<NoteId>,
// Return channel to inform the note sender that processing is done.
tokio::sync::oneshot::Sender<()>,
>,
...
}
impl Coordinator {
fn handle_note_failure(&mut self, notes: HashSet<NoteId>, notify: oneshot::Sender<()> {
self.db.increment_note_failures(notes).await;
let _ = notify.send().await;
}
}
struct Actor {
tx_notes_failed: tokio::sync::mpsc::Sender < ..as above..>,
}
impl Actor {
// Channel closure handling elided as demo code.
async fn process_note_failure(&self, notes: HashSet<NoteId>) {
let (tx, rx) = tokio::sync::oneshot::channel();
let _ = self.tx_notes_failed.send(notes, tx).await;
// Wait for coordinator to inform us that processing completed.
// If we don't wait, we risk a data race with the note failure
// being updated in the database.
rx.await;
}
}There are alternatives, but this has the advantage that the oneshot return channels usage is obviously tied to the note failure request. You could have an additional channel per actor, or try to re-use the notification channel, but then you run into other problems e.g. that channel will have other signals on it already maybe.
ce60fd3 to
931937b
Compare
931937b to
9198f08
Compare
|
@Mirko-von-Leipzig should I wait for any other review to merge this one? |
closes #1657
closes #1628
This PR includes two major changes:
Persistent note script cache.
Adds a
note_scripttable to the database so that fetched note scripts survive restarts.Centralized DB writes via actor notification channel.
In the phase 2 PR, when we switched from in-memory to the SQLite db, actors were still writing to the DB to mark notes as failing. In this PR I added a channel between actor and coordinator so that the actor can emit an event to the coordinator when it is failing to consume a note.