-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: exex manager #7340
feat: exex manager #7340
Conversation
ac70d36
to
46d5d8d
Compare
46d5d8d
to
69d4abe
Compare
cdbcb5f
to
736762e
Compare
736762e
to
d8a3793
Compare
c773bab
to
0563fce
Compare
//! ExEx's are initialized using an async closure that resolves to the ExEx; this closure gets | ||
//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we link node builder here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think that requires importing the node builder?
exex_tx: self.exex_tx.clone(), | ||
num_exexs: self.num_exexs, | ||
is_ready_receiver: self.is_ready_receiver.clone(), | ||
is_ready: WatchStream::new(self.is_ready_receiver.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't clone a WatchStream
so we need to keep the receiver as an additional field and create a new WatchStream
on clone
PR is RFR except I'm adding tests |
impl ExExManagerHandle { | ||
/// Synchronously send a notification over the channel to all execution extensions. | ||
/// | ||
/// Senders should call [`Self::has_capacity`] first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call it inside this method instead, so that caller can't go beyond the capacity?
can't use `id` in the span 🤷
shoutout alexey
e4e4f74
to
e17da42
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few nits
fn send( | ||
&mut self, | ||
cx: &mut Context<'_>, | ||
(event_id, notification): &(usize, CanonStateNotification), | ||
) -> Poll<Result<(), PollSendError<CanonStateNotification>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this type is basically a Sink,
we should also replicate the logic here with poll_reserve as a separate function the manager calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you want to implement sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we should just add the equivalent fo poll_reserve,
but can also do this separately, so no blocker here
let notification_id = exex | ||
.next_notification_id | ||
.checked_sub(self.min_id) | ||
.expect("exex expected notification ID outside the manager's range"); | ||
if let Some(notification) = self.buffer.get(notification_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this lookup is a bit weird because this now looks up by index, I guess this is fine since we're keeping track of all the ids in the exexhandle, but ideally we lookup the buffer by id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you'd want a map instead?
Adds a manager for ExEx's. The manager is responsible for delivering
CanonStateNotification
s to ExEx's, keeping track of what notifications they have already been delivered, and managing an internal buffer of these notifications, as well as backpressure from anyone holding the handle.The manager has a handle,
ExExManagerHandle
that is going to be used by a few components, e.g. the execution stage, blockchain tree, and pruner, to communicate with the manager by sending notifications, or reading the current state of the manager, e.g. what blocks are safe to prune.The manager also reads events coming from ExEx's.
Finally, the manager is responsible for ExEx metrics.
This PR also implements actually spawning the ExEx's, the manager, as well as processing notifications from the blockchain tree.
A follow up PR will integrate this with the execution stage.