Skip to content

Commit

Permalink
refactor(collator): rework some nits
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Sep 5, 2024
1 parent a8165db commit cf992af
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 100 deletions.
2 changes: 1 addition & 1 deletion block-util/src/archive/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn read_archive_prefix(buf: &[u8], offset: &mut usize) -> Result<(), ArchiveRead

// NOTE: `end > end + 4` is needed here because it eliminates useless
// bounds check with panic. It is not even included into result assembly
if buf.len() < end + 4 || end > end + 4 {
if buf.len() < end + 4 || end > end.wrapping_add(4) {
return Err(ArchiveReaderError::UnexpectedArchiveEof);
}

Expand Down
1 change: 0 additions & 1 deletion block-util/src/queue/queue_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub struct QueueDiffStuff {
}

impl QueueDiffStuff {
#[cfg(any(test, feature = "test"))]
pub fn new_empty(block_id: &BlockId) -> Self {
use std::collections::BTreeMap;

Expand Down
4 changes: 3 additions & 1 deletion block-util/src/state/shard_state_stuff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ impl ShardStateStuff {
}

pub fn get_gen_chain_time(&self) -> u64 {
self.state().gen_utime as u64 * 1000 + self.state().gen_utime_ms as u64
let state = self.state();
debug_assert!(state.gen_utime_ms < 1000);
state.gen_utime as u64 * 1000 + state.gen_utime_ms as u64
}
}

Expand Down
1 change: 1 addition & 0 deletions cli/src/node/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl CmdDumpArchive {
anyhow::bail!("archive not found");
};

#[allow(clippy::disallowed_types)]
let file = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
Expand Down
4 changes: 2 additions & 2 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, HashMap};
use std::collections::hash_map;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -1236,7 +1236,7 @@ impl CollatorStdImpl {
let top_shard_blocks_info_map = top_shard_blocks_info
.into_iter()
.map(|info| (info.block_id.shard, info))
.collect::<HashMap<_, _>>();
.collect::<FastHashMap<_, _>>();

// update existing shard descriptions for which top blocks were not changed
for (shard_id, prev_shard_descr) in collation_data_builder.shards_mut()? {
Expand Down
57 changes: 27 additions & 30 deletions collator/src/manager/blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,21 +374,22 @@ impl BlocksCache {
pub(super) async fn store_block_from_bc(
&self,
state_node_adapter: Arc<dyn StateNodeAdapter>,
state: &ShardStateStuff,
state: ShardStateStuff,
) -> Result<Option<BlockCacheStoreResult>> {
let block_id = *state.block_id();

// TODO: should build entry only on insert

// load queue diff
let (prev_block_ids, queue_diff_and_msgs) =
utils::load_block_queue_diff_stuff(state_node_adapter.clone(), &block_id).await?;
let loaded =
utils::load_block_queue_diff_stuff(state_node_adapter.as_ref(), &block_id).await?;

// build entry
let entry = BlockCacheEntry::from_block_from_bc(
state,
prev_block_ids.clone(),
queue_diff_and_msgs,
loaded.prev_ids,
loaded.queue_diff,
loaded.out_msgs,
)?;

let result = if block_id.shard.is_masterchain() {
Expand All @@ -406,7 +407,11 @@ impl BlocksCache {
return Ok(None);
}

let stored = match guard.blocks.entry(block_id.seqno) {
let kind;
let send_sync_status;
let prev_shard_blocks_ids;
let prev_ids;
match guard.blocks.entry(block_id.seqno) {
btree_map::Entry::Occupied(mut occupied) => {
let existing = occupied.get_mut();

Expand Down Expand Up @@ -439,31 +444,23 @@ impl BlocksCache {
}
}

(
existing.kind,
existing.send_sync_status,
VecDeque::new(),
vec![],
)
kind = existing.kind;
send_sync_status = existing.send_sync_status;
prev_shard_blocks_ids = VecDeque::new();
prev_ids = Vec::new();
}
btree_map::Entry::Vacant(vacant) => {
let prev_shard_blocks_ids = entry
.top_shard_blocks_ids_iter()
.cloned()
.collect::<VecDeque<_>>();

let inserted = vacant.insert(entry);
(
inserted.kind,
inserted.send_sync_status,
prev_shard_blocks_ids,
prev_block_ids,
)

kind = inserted.kind;
send_sync_status = inserted.send_sync_status;
prev_shard_blocks_ids = inserted.top_shard_blocks_ids_iter().cloned().collect();
prev_ids = inserted.prev_blocks_ids.clone();
}
};

// remove state from prev mc block because we need only last one
for prev_block_id in stored.3 {
for prev_block_id in prev_ids {
if let Some(entry) = guard.blocks.get_mut(&prev_block_id.seqno) {
if let Some(applied_block_stuff) = entry.applied_block_stuff.as_mut() {
tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
Expand All @@ -484,16 +481,16 @@ impl BlocksCache {

let result = BlockCacheStoreResult {
block_id,
kind: stored.0,
send_sync_status: stored.1,
kind,
send_sync_status,
last_collated_mc_block_id: guard.last_collated_mc_block_id,
applied_mc_queue_range: guard.applied_mc_queue_range,
};
drop(guard); // TODO: use scope instead

if result.kind == BlockCacheEntryKind::Received {
// traverse through including shard blocks and update their link to the containing master block
self.set_containing_mc_block(block_id.as_short_id(), stored.2);
self.set_containing_mc_block(block_id.as_short_id(), prev_shard_blocks_ids);
}

result
Expand All @@ -513,7 +510,7 @@ impl BlocksCache {
return Ok(None);
}

let stored = match shard_cache.blocks.entry(block_id.seqno) {
let (kind, send_sync_status) = match shard_cache.blocks.entry(block_id.seqno) {
btree_map::Entry::Occupied(mut occupied) => {
let existing = occupied.get_mut();

Expand Down Expand Up @@ -561,8 +558,8 @@ impl BlocksCache {
let mc_guard = self.masters.lock();
BlockCacheStoreResult {
block_id,
kind: stored.0,
send_sync_status: stored.1,
kind,
send_sync_status,
last_collated_mc_block_id: mc_guard.last_collated_mc_block_id,
applied_mc_queue_range: mc_guard.applied_mc_queue_range,
}
Expand Down
80 changes: 42 additions & 38 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, BTreeMap, HashMap, VecDeque};
use std::collections::{hash_map, BTreeMap, VecDeque};
use std::sync::Arc;

use anyhow::{anyhow, bail, Result};
Expand Down Expand Up @@ -31,8 +31,8 @@ use crate::queue_adapter::MessageQueueAdapter;
use crate::state_node::{StateNodeAdapter, StateNodeAdapterFactory, StateNodeEventListener};
use crate::types::{
BlockCollationResult, BlockIdExt, CollationConfig, CollationSessionId, CollationSessionInfo,
DisplayAsShortId, DisplayBTreeMap, DisplayBlockIdsSlice, McData, ShardDescriptionExt,
TopBlockDescription,
DebugIter, DisplayAsShortId, DisplayBTreeMap, DisplayBlockIdsSlice, McData,
ShardDescriptionExt, TopBlockDescription,
};
use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE};
use crate::utils::schedule_async_action;
Expand Down Expand Up @@ -829,7 +829,7 @@ where

let Some(store_res) = self
.blocks_cache
.store_block_from_bc(self.state_node_adapter.clone(), &state)
.store_block_from_bc(self.state_node_adapter.clone(), state)
.await?
else {
self.ready_to_sync.notify_one();
Expand Down Expand Up @@ -1137,16 +1137,13 @@ where
Some(processed_to) => processed_to,
None => {
// try get from storage
let (_, queue_diff_and_msgs) = utils::load_block_queue_diff_stuff(
self.state_node_adapter.clone(),
let loaded = utils::load_only_queue_diff_stuff(
self.state_node_adapter.as_ref(),
&top_block_id,
)
.await?;
if let Some((queue_diff_stuff, _)) = queue_diff_and_msgs {
queue_diff_stuff.as_ref().processed_upto.clone()
} else {
bail!("Block not found in cache and storage! ({})", top_block_id)
}

loaded.as_ref().processed_upto.clone()
}
};

Expand Down Expand Up @@ -1303,7 +1300,7 @@ where
tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "mc_data: {:?}", mc_data);

// get new shards info from updated master state
let mut new_shards_info = HashMap::new();
let mut new_shards_info = FastHashMap::default();
new_shards_info.insert(ShardIdent::MASTERCHAIN, vec![mc_data.block_id]);
for shard in mc_data.shards.iter() {
let (shard_id, descr) = shard?;
Expand Down Expand Up @@ -1345,29 +1342,35 @@ where
tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "full_validators_set {:?}", full_validators_set);

// compare with active sessions and detect new sessions to start and outdated sessions to finish
let mut sessions_to_keep = HashMap::new();
let mut sessions_to_start = vec![];
let mut to_finish_sessions = HashMap::new();
let mut to_stop_collators = HashMap::new();
let mut sessions_to_keep = Vec::new();
let mut sessions_to_start = Vec::new();
let mut to_finish_sessions = Vec::new();
let mut to_stop_collators = Vec::new();
{
let mut active_collation_sessions_guard = self.active_collation_sessions.write();
let mut missed_shards_ids: FastHashSet<_> = active_shards_ids.into_iter().collect();
for shard_info in new_shards_info {
missed_shards_ids.remove(&shard_info.0);
match active_collation_sessions_guard.entry(shard_info.0) {
for (shard_ident, block_ids) in new_shards_info {
missed_shards_ids.remove(&shard_ident);
match active_collation_sessions_guard.entry(shard_ident) {
hash_map::Entry::Occupied(entry) => {
let existing_session = entry.get().clone();
let existing_session = entry.get();
if existing_session.seqno() >= new_session_seqno {
sessions_to_keep.insert(shard_info.0, (existing_session, shard_info.1));
sessions_to_keep.push((
shard_ident,
existing_session.clone(),
block_ids,
));
} else {
to_finish_sessions
.insert((shard_info.0, new_session_seqno), existing_session);
sessions_to_start.push(shard_info);
entry.remove();
to_finish_sessions.push((
shard_ident,
new_session_seqno,
entry.remove(),
));
sessions_to_start.push((shard_ident, block_ids));
}
}
hash_map::Entry::Vacant(_) => {
sessions_to_start.push(shard_info);
sessions_to_start.push((shard_ident, block_ids));
}
}
}
Expand All @@ -1380,10 +1383,9 @@ where
if let Some(current_active_session) =
active_collation_sessions_guard.remove(&shard_id)
{
to_finish_sessions
.insert((shard_id, new_session_seqno), current_active_session);
if let Some(collator) = self.active_collators.remove(&shard_id) {
to_stop_collators.insert((shard_id, new_session_seqno), collator.1);
to_finish_sessions.push((shard_id, new_session_seqno, current_active_session));
if let Some((_, collator)) = self.active_collators.remove(&shard_id) {
to_stop_collators.push((shard_id, new_session_seqno, collator));
}
}
}
Expand All @@ -1392,20 +1394,20 @@ where
tracing::debug!(
target: tracing_targets::COLLATION_MANAGER,
"Will keep existing collation sessions: {:?}",
sessions_to_keep.keys(),
DebugIter(sessions_to_keep.iter().map(|(shard_ident, _, _)| *shard_ident)),
);
if !sessions_to_start.is_empty() {
tracing::info!(
target: tracing_targets::COLLATION_MANAGER,
"Will start new collation sessions: {:?}",
sessions_to_start.iter().map(|(k, _)| k).collect::<Vec<_>>(),
DebugIter(sessions_to_start.iter().map(|(k, _)| k)),
);
}

let cc_config = mc_data.config.get_catchain_config()?;

// update master state in existing collators and resume collation
for (shard_id, (_, prev_blocks_ids)) in sessions_to_keep {
for (shard_id, _, prev_blocks_ids) in sessions_to_keep {
// if there is no active collator then current node does not collate this shard
// so we do not need to do anything
let collator = {
Expand Down Expand Up @@ -1528,7 +1530,7 @@ where
shard_id,
);
if let Some((_, active_collator)) = self.active_collators.remove(&shard_id) {
to_stop_collators.insert((shard_id, new_session_seqno), active_collator);
to_stop_collators.push((shard_id, new_session_seqno, active_collator));
}
}

Expand All @@ -1542,12 +1544,13 @@ where
tracing::info!(
target: tracing_targets::COLLATION_MANAGER,
"Will finish outdated collation sessions: {:?}",
to_finish_sessions.keys(),
DebugIter(to_finish_sessions.iter().map(|(s, seq, _)| (s, seq))),
);
}

// enqueue outdated sessions finish tasks
for (finish_key, session_info) in to_finish_sessions {
for (shard_ident, session_seqno, session_info) in to_finish_sessions {
let finish_key = (shard_ident, session_seqno);
self.collation_sessions_to_finish
.insert(finish_key, session_info.clone());
self.finish_collation_session(session_info, finish_key)
Expand All @@ -1558,12 +1561,13 @@ where
tracing::info!(
target: tracing_targets::COLLATION_MANAGER,
"Will stop collators for sessions that we do not serve: {:?}",
to_stop_collators.keys(),
DebugIter(to_stop_collators.iter().map(|(s, seq, _)| (s, seq))),
);
}

// enqueue dangling collators stop tasks
for (stop_key, active_collator) in to_stop_collators {
for (shard_ident, session_seqno, active_collator) in to_stop_collators {
let stop_key = (shard_ident, session_seqno);
active_collator.collator.enqueue_stop(stop_key).await?;
self.collators_to_stop.insert(stop_key, active_collator);
}
Expand Down
15 changes: 11 additions & 4 deletions collator/src/manager/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ impl BlockCacheEntry {
}

pub fn from_block_from_bc(
state: &ShardStateStuff,
state: ShardStateStuff,
prev_blocks_ids: Vec<BlockId>,
queue_diff_and_msgs: Option<(QueueDiffStuff, Lazy<OutMsgDescr>)>,
queue_diff: QueueDiffStuff,
out_msgs: Lazy<OutMsgDescr>,
) -> Result<Self> {
let block_id = *state.block_id();
let key = block_id.as_short_id();
Expand All @@ -251,8 +252,8 @@ impl BlockCacheEntry {
}

let applied_block_stuff = AppliedBlockStuff {
state: Some(state.clone()),
queue_diff_and_msgs,
state: Some(state),
queue_diff_and_msgs: Some((queue_diff, out_msgs)),
};

Ok(Self {
Expand Down Expand Up @@ -377,3 +378,9 @@ impl std::fmt::Display for McBlockSubgraphExtract {
}
}
}

pub struct LoadedQueueDiffContext {
pub prev_ids: Vec<BlockId>,
pub queue_diff: QueueDiffStuff,
pub out_msgs: Lazy<OutMsgDescr>,
}
Loading

0 comments on commit cf992af

Please sign in to comment.