Skip to content

Commit

Permalink
feat(mempool): add scopeguard
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Oct 31, 2024
1 parent 807d3b7 commit a5e2539
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions collator/src/mempool/impls/std_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ impl MempoolAdapterStdImpl {
UnappliedConfig::apply_vset(&handle, last_state_update)?;

tokio::spawn(async move {
scopeguard::defer!(tracing::warn!(
target: tracing_targets::MEMPOOL_ADAPTER,
"mempool engine stopped"
));
engine.run().await;
});

Expand All @@ -222,6 +226,10 @@ impl MempoolAdapterStdImpl {
config: ConsensusConfig,
mut anchor_rx: mpsc::UnboundedReceiver<CommitResult>,
) {
scopeguard::defer!(tracing::warn!(
target: tracing_targets::MEMPOOL_ADAPTER,
"handle anchors task stopped"
));
let mut parser = Parser::new(config.deduplicate_rounds);
let mut first_after_gap = None;
while let Some(commit) = anchor_rx.recv().await {
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ parking_lot = { workspace = true }
rand = { workspace = true }
rand_pcg = { workspace = true }
rayon = { workspace = true }
scopeguard = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tl-proto = { workspace = true }
tokio = { workspace = true, default-features = false }
Expand Down
1 change: 1 addition & 0 deletions consensus/src/engine/input_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl InputBufferImpl {
inner: Arc<Mutex<InputBufferData>>,
mut externals: mpsc::UnboundedReceiver<Bytes>,
) {
scopeguard::defer!(tracing::warn!("externals input buffer task stopped"));
while let Some(payload) = externals.recv().await {
let mut data = inner.lock();
data.add(payload);
Expand Down
17 changes: 13 additions & 4 deletions consensus/src/intercom/peer_schedule/impl_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ impl PeerSchedule {
stateless.next_epoch_start = Some(next_round);
});

tracing::info!(
"peer schedule next subset updated for {next_round:?} {:?} {:?}",
self.atomic().alt(),
locked.data,
);

true
}

Expand All @@ -102,7 +108,7 @@ impl PeerSchedule {
let mut locked = self.write();
tracing::debug!(
"peer schedule before rotation for {current:?}: {:?} {:?}",
self.atomic(),
self.atomic().alt(),
locked.data,
);

Expand All @@ -116,8 +122,8 @@ impl PeerSchedule {
stateless.rotate();
});
tracing::info!(
"peer schedule rotated for {current:?}: {:?} {:?}",
self.atomic(),
"peer schedule rotated for {current:?} {:?} {:?}",
self.atomic().alt(),
locked.data,
);
}
Expand Down Expand Up @@ -152,6 +158,7 @@ impl PeerSchedule {

pub async fn run_updater(self) {
tracing::info!("starting peer schedule updates");
scopeguard::defer!(tracing::warn!("peer schedule updater stopped"));
let (local_id, mut rx) = {
let mut guard = self.write();

Expand Down Expand Up @@ -236,11 +243,13 @@ impl PeerSchedule {
impl Future<Output = KnownPeerHandle> + Sized + Send + 'static,
>,
) -> Option<JoinTask<()>> {
tracing::info!("restart resolve task");
if resolved_waiters.is_empty() {
tracing::info!("peer schedule resolve task not started: all peers resolved");
None
} else {
let join_task = JoinTask::new(async move {
tracing::info!("peer schedule resolve task started");
scopeguard::defer!(tracing::info!("peer schedule resolve task stopped"));
while let Some(known_peer_handle) = resolved_waiters.next().await {
_ = self
.write()
Expand Down
31 changes: 18 additions & 13 deletions consensus/src/intercom/peer_schedule/stateless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use everscale_crypto::ed25519::KeyPair;
use tycho_network::PeerId;
use tycho_util::FastHashSet;

use crate::effects::AltFormat;
use crate::effects::{AltFmt, AltFormat};
use crate::models::Round;

#[derive(Clone)]
Expand Down Expand Up @@ -83,7 +83,7 @@ impl PeerScheduleStateless {
&self.empty_set
};
if result.is_empty() {
tracing::error!("empty peer set for {round:?}: {self:?}");
tracing::error!("empty peer set for {round:?}: {:?}", self.alt());
}
result
}
Expand All @@ -99,7 +99,7 @@ impl PeerScheduleStateless {
&self.empty_vec
};
if result.is_empty() {
tracing::error!("empty peer set for {round:?}: {self:?}");
tracing::error!("empty peer set for {round:?}: {:?}", self.alt());
}
result
}
Expand All @@ -113,17 +113,19 @@ impl PeerScheduleStateless {
pub(super) fn rotate(&mut self) {
assert!(
self.peer_sets[0].is_empty() && self.peer_vecs[0].is_empty(),
"previous peer set was not cleaned; {self:?}"
"previous peer set was not cleaned; {:?}",
self.alt()
);
// make next from previous
let next = self
.next_epoch_start
.ok_or_else(|| format!("{self:?}"))
.ok_or_else(|| format!("{:?}", self.alt()))
.expect("attempt to change epoch, but next epoch start is not set");

assert!(
next > self.cur_epoch_start,
"next start is not in future {self:?}"
"next start is not in future {:?}",
self.alt()
);

self.prev_epoch_start = self.cur_epoch_start;
Expand All @@ -140,19 +142,22 @@ impl PeerScheduleStateless {
}
}

impl std::fmt::Debug for PeerScheduleStateless {
impl AltFormat for PeerScheduleStateless {}
impl std::fmt::Debug for AltFmt<'_, PeerScheduleStateless> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = AltFormat::unpack(self);

f.write_str("PeerScheduleStateless { ")?;

write!(f, "prev: {{ start: {}, ", self.prev_epoch_start.0)?;
write!(f, "{} }}, ", self.peer_vecs[0].as_slice().alt())?;
write!(f, "prev: {{ start: {}, ", inner.prev_epoch_start.0)?;
write!(f, "{} }}, ", inner.peer_vecs[0].as_slice().alt())?;

write!(f, "current: {{ start: {}, ", self.cur_epoch_start.0)?;
write!(f, "{} }}, ", self.peer_vecs[1].as_slice().alt())?;
write!(f, "current: {{ start: {}, ", inner.cur_epoch_start.0)?;
write!(f, "{} }}, ", inner.peer_vecs[1].as_slice().alt())?;

let next_epoch_start = self.next_epoch_start.map(|a| a.0);
let next_epoch_start = inner.next_epoch_start.map(|a| a.0);
write!(f, "next: {{ start: {:?}, ", next_epoch_start)?;
write!(f, "{} }} ", self.peer_vecs[2].as_slice().alt())?;
write!(f, "{} }} ", inner.peer_vecs[2].as_slice().alt())?;

f.write_str("}")
}
Expand Down

0 comments on commit a5e2539

Please sign in to comment.