Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: poll loop improvements #7609

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions crates/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,20 @@ impl ExExHandle {
)
}

/// Reserves a slot in the `PollSender` channel and sends the notification if the slot was
/// Attempts to prepare the sender to receive a value.
/// See also [PollSender::poll_reserve]
fn poll_reserve(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), PollSendError<CanonStateNotification>>> {
self.sender.poll_reserve(cx)
}

/// Sends a slot in the `PollSender` channel and sends the notification if the slot was
/// successfully reserved.
///
/// When the notification is sent, it is considered delivered.
///
/// If poll_reserve was not successfully called prior to calling send_item, then this method will panic.
fn send(
&mut self,
cx: &mut Context<'_>,
(event_id, notification): &(usize, CanonStateNotification),
) -> Poll<Result<(), PollSendError<CanonStateNotification>>> {
// check that this notification is above the finished height of the exex if the exex has set
Expand All @@ -99,11 +106,6 @@ impl ExExHandle {
}
}

match self.sender.poll_reserve(cx) {
Poll::Ready(Ok(())) => (),
other => return other,
}

match self.sender.send_item(notification.clone()) {
Ok(()) => {
self.next_notification_id = event_id + 1;
Expand Down Expand Up @@ -271,11 +273,30 @@ impl Future for ExExManager {
// update capacity
self.update_capacity();

// advance all poll senders
let mut min_id = usize::MAX;
// This keeps track of the lowest notification ID that has been delivered to all ExExs.
let mut min_notification_id = usize::MAX;

// advance all exexs and send them notifications
for idx in (0..self.exex_handles.len()).rev() {
let mut exex = self.exex_handles.swap_remove(idx);

// drain notifications
loop {
// TODO perhaps this should not be in this loop but in the loop above that we exit if all exex are pending
// ensure the exex has capacity
match exex.poll_reserve(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => break,
}

// TODO try send next notification to exex if we have it


break
}


// it is a logic error for this to ever underflow since the manager manages the
// notification IDs
let notification_id = exex
Expand All @@ -289,14 +310,14 @@ impl Future for ExExManager {
return Poll::Ready(Err(err.into()))
}
}
min_id = min_id.min(exex.next_notification_id);
min_notification_id = min_notification_id.min(exex.next_notification_id);
self.exex_handles.push(exex);
}

// remove processed buffered notifications
self.buffer.retain(|&(id, _)| id >= min_id);
self.min_id = min_id;
debug!(min_id, "lowest notification id in buffer updated");
self.buffer.retain(|&(id, _)| id >= min_notification_id);
self.min_id = min_notification_id;
debug!(min_notification_id, "lowest notification id in buffer updated");

// update capacity
self.update_capacity();
Expand Down Expand Up @@ -329,6 +350,12 @@ impl Future for ExExManager {
let _ = self.finished_height.send(Some(finished_height));
}

// TODO this pending is potentially dangerous because of this scenario;
// 1. self.handle_rx.poll_recv(cx) is not fully drained
// 2. loop {self.exex_handles.swap_remove(idx) } drains the entire buffer
// 3. no waker is no set
// TODO: wrap the entire function in loop that only returns pending if the buffer is __not__ empty

Poll::Pending
}
}
Expand Down
Loading