Skip to content

Commit

Permalink
feature(int-queue): partitions
Browse files Browse the repository at this point in the history
ref(collator): change load statistics func
  • Loading branch information
drmick authored and Rexagon committed Jan 23, 2025
1 parent 3d89481 commit 5e7756d
Show file tree
Hide file tree
Showing 24 changed files with 1,085 additions and 264 deletions.
4 changes: 3 additions & 1 deletion block-util/src/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub use self::proto::{QueueDiff, QueueKey, QueueState, QueueStateHeader, QueueStateRef};
pub use self::proto::{
QueueDiff, QueueKey, QueuePartition, QueueState, QueueStateHeader, QueueStateRef,
};
pub use self::queue_diff::{
QueueDiffMessagesIter, QueueDiffStuff, QueueDiffStuffAug, SerializedQueueDiff,
};
Expand Down
41 changes: 40 additions & 1 deletion block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use bytes::Bytes;
use everscale_types::models::*;
use everscale_types::prelude::*;
use tl_proto::{TlRead, TlWrite};
use tycho_util::FastHashMap;

use crate::tl;

Expand All @@ -30,6 +31,8 @@ pub struct QueueDiff {
pub max_message: QueueKey,
/// List of message hashes (sorted ASC).
pub messages: Vec<HashBytes>,
/// Partition router
pub partition_router: FastHashMap<IntAddr, QueuePartition>,
}

impl QueueDiff {
Expand Down Expand Up @@ -94,6 +97,8 @@ impl<'tl> TlRead<'tl> for QueueDiff {
min_message: QueueKey::read_from(data)?,
max_message: QueueKey::read_from(data)?,
messages: messages_list::read(data)?,
// TODO !!! add read for partition_router
partition_router: Default::default(),
};

if result.max_message < result.min_message {
Expand Down Expand Up @@ -147,13 +152,39 @@ pub struct QueueStateHeader {
}

/// Queue key.
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)]
#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)]
pub struct QueueKey {
pub lt: u64,
#[tl(with = "tl::hash_bytes")]
pub hash: HashBytes,
}

#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)]
#[tl(boxed)]
pub enum QueuePartition {
#[tl(id = 0)]
NormalPriority = 0,
#[tl(id = 1)]
LowPriority = 1,
}

impl Default for QueuePartition {
fn default() -> Self {
Self::NormalPriority
}
}

impl From<u8> for QueuePartition {
fn from(value: u8) -> Self {
match value {
0 => Self::NormalPriority,
1 => Self::LowPriority,
_ => panic!("Invalid value for QueuePartition"),
}
}
}

impl QueueKey {
const SIZE_HINT: usize = 8 + 32;

Expand Down Expand Up @@ -201,6 +232,12 @@ impl From<QueueKey> for (u64, HashBytes) {
}
}

impl std::fmt::Debug for QueueKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}

impl std::fmt::Display for QueueKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LT_HASH({}_{})", self.lt, self.hash)
Expand Down Expand Up @@ -470,6 +507,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
};

let bytes = tl_proto::serialize(&diff);
Expand Down Expand Up @@ -516,6 +554,7 @@ mod tests {
HashBytes::from([0x02; 32]),
HashBytes::from([0x03; 32]),
],
partition_router: Default::default(),
};

// NOTE: We need this for the hash computation.
Expand Down
5 changes: 4 additions & 1 deletion block-util/src/queue/queue_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use everscale_types::prelude::*;
use tl_proto::TlRead;

use crate::archive::WithArchiveData;
use crate::queue::proto::{QueueDiff, QueueKey};
use crate::queue::proto::{QueueDiff, QueueKey, QueuePartition};

pub type QueueDiffStuffAug = WithArchiveData<QueueDiffStuff>;

Expand Down Expand Up @@ -104,6 +104,7 @@ impl QueueDiffStuff {
min_message: QueueKey::MIN,
max_message: QueueKey::MIN,
messages: Vec::new(),
partition_router: Default::default(),
},
}),
}
Expand Down Expand Up @@ -136,6 +137,7 @@ impl QueueDiffStuff {
min_message: Default::default(),
max_message: Default::default(),
messages: Default::default(),
partition_router: Default::default(),
},
}),
}
Expand Down Expand Up @@ -369,6 +371,7 @@ mod tests {
hash: message_hashes[9],
},
messages: message_hashes.clone(),
partition_router: Default::default(),
},
}),
};
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/debug_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use everscale_types::models::{
};
use tycho_util::FastHashMap;

use crate::types::DebugDisplay;
use crate::types::processed_upto::ProcessedUptoInfoStuff;
use crate::types::DebugDisplay;

pub struct BlockDebugInfo<'a> {
pub block_id: &'a BlockId,
Expand Down
3 changes: 2 additions & 1 deletion collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ impl Phase<FinalizeState> {
&labels,
);

let statistics = (queue_diff_with_msgs.clone(), block_id_short.shard).into();
mq_adapter.apply_diff(
queue_diff_with_msgs,
block_id_short,
&queue_diff_hash,
max_message,
statistics,
)?;
let apply_queue_diff_elapsed = histogram.finish();

Expand Down
21 changes: 12 additions & 9 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ use std::sync::Arc;
use anyhow::{bail, Context, Result};
use everscale_types::models::{MsgInfo, ShardIdent};
use tycho_block_util::queue::QueueKey;
use tycho_util::FastHashMap;

use super::super::messages_buffer::{MessagesBufferLimits, MessagesBufferV2};
use super::super::types::{Dequeued, ParsedMessage};
use super::{InternalsRangeReaderState, PartitionReaderState, ShardReaderState};
use crate::internal_queue::iterator::{IterItem, QueueIterator};
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{EnqueuedMessage, QueueShardRange};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{BlockSeqno, Lt, PartitionId};
Expand Down Expand Up @@ -495,18 +494,22 @@ impl InternalsRangeReader {
fn init(&mut self) -> Result<()> {
// do not init iterator if range is fully read
if !self.fully_read {
let mut ranges_from = FastHashMap::default();
let mut ranges_to = FastHashMap::default();
let mut ranges = vec![];

for (shard_id, shard_reader_state) in &self.reader_state.shards {
let shard_range_to = QueueKey::max_for_lt(shard_reader_state.to);
ranges_from.insert(*shard_id, shard_reader_state.current_position);
ranges_to.insert(*shard_id, shard_range_to);
ranges.push(QueueShardRange {
shard_ident: *shard_id,
from: shard_reader_state.current_position,
to: shard_range_to,
});
}

let iterator =
self.mq_adapter
.create_iterator(self.for_shard_id, ranges_from, ranges_to)?;
let iterator = self.mq_adapter.create_iterator(
self.for_shard_id,
self.partition_id.into(),
ranges,
)?;

self.iterator_opt = Some(iterator);
}
Expand Down
1 change: 1 addition & 0 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ impl<V: InternalMessageValue> NewMessagesState<V> {
queue_diff_with_msgs: QueueDiffWithMessages {
messages: Default::default(),
processed_to: Default::default(),
partition_router: Default::default(),
},
messages_for_current_shard: Default::default(),
max_message_key_for_current_shard: QueueKey::MIN,
Expand Down
29 changes: 15 additions & 14 deletions collator/src/collator/tests/execution_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use everscale_types::models::{
BlockId, BlockIdShort, BlockchainConfig, CurrencyCollection, ExternalsProcessedUpto,
ShardDescription, ShardIdent, ShardStateUnsplit, ValidatorInfo,
};
use tycho_block_util::queue::QueueKey;
use tycho_block_util::queue::{QueueKey, QueuePartition};
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_util::FastHashMap;

Expand All @@ -22,7 +22,8 @@ use super::{
};
use crate::internal_queue::iterator::{IterItem, QueueIterator};
use crate::internal_queue::types::{
EnqueuedMessage, InternalMessageValue, QueueDiffWithMessages, QueueFullDiff,
EnqueuedMessage, InternalMessageValue, QueueDiffWithMessages, QueueFullDiff, QueueRange,
ShardPartition,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::test_utils::try_init_test_tracing;
Expand Down Expand Up @@ -50,11 +51,7 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorTestImpl<V> {
unimplemented!()
}

fn current_position(&self) -> FastHashMap<ShardIdent, QueueKey> {
unimplemented!()
}

fn add_message(&mut self, _message: V) -> Result<()> {
fn current_position(&self) -> FastHashMap<ShardPartition, QueueKey> {
unimplemented!()
}

Expand All @@ -70,9 +67,13 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorTestImpl<V> {
unimplemented!()
}

fn commit(&mut self, _messages: Vec<(ShardIdent, QueueKey)>) -> Result<()> {
fn commit(&mut self, _messages: Vec<(ShardPartition, QueueKey)>) -> Result<()> {
Ok(())
}

fn add_message(&mut self, _partition: QueuePartition, _message: V) -> Result<()> {
unimplemented!()
}
}

#[derive(Default)]
Expand All @@ -85,9 +86,8 @@ struct MessageQueueAdapterTestImpl<V: InternalMessageValue> {
impl<V: InternalMessageValue + Default> MessageQueueAdapter<V> for MessageQueueAdapterTestImpl<V> {
fn create_iterator(
&self,
_for_shard_id: ShardIdent,
_shards_from: FastHashMap<ShardIdent, QueueKey>,
_shards_to: FastHashMap<ShardIdent, QueueKey>,
for_shard_id: ShardIdent,
ranges: Vec<QueueRange>,
) -> Result<Box<dyn QueueIterator<V>>> {
Ok(Box::new(QueueIteratorTestImpl::default()))
}
Expand All @@ -108,16 +108,17 @@ impl<V: InternalMessageValue + Default> MessageQueueAdapter<V> for MessageQueueA

fn add_message_to_iterator(
&self,
_iterator: &mut Box<dyn QueueIterator<V>>,
_message: V,
iterator: &mut Box<dyn QueueIterator<V>>,
partition: QueuePartition,
message: V,
) -> Result<()> {
unimplemented!()
}

fn commit_messages_to_iterator(
&self,
_iterator: &mut Box<dyn QueueIterator<V>>,
_messages: Vec<(ShardIdent, QueueKey)>,
_messages: Vec<(ShardPartition, QueueKey)>,
) -> Result<()> {
unimplemented!()
}
Expand Down
42 changes: 21 additions & 21 deletions collator/src/internal_queue/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,27 @@ fn gc_task<V: InternalMessageValue>(
committed_state: Arc<dyn CommittedState<V>>,
delete_until: HashMap<ShardIdent, QueueKey>,
) {
let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time");

let mut gc_state = gc_state.lock().unwrap();

for (shard, current_last_key) in delete_until.iter() {
let can_delete = gc_state
.get(shard)
.map_or(true, |last_key| *current_last_key > *last_key);

if can_delete {
if let Err(e) = committed_state.delete_messages(*shard, current_last_key) {
tracing::error!(target: tracing_targets::MQ, "failed to delete messages: {e:?}");
}

let labels = [("workchain", shard.workchain().to_string())];
metrics::gauge!("tycho_internal_queue_processed_upto", &labels)
.set(current_last_key.lt as f64);

gc_state.insert(*shard, *current_last_key);
}
}
// let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time");
//
// let mut gc_state = gc_state.lock().unwrap();
//
// for (shard, current_last_key) in delete_until.iter() {
// let can_delete = gc_state
// .get(shard)
// .map_or(true, |last_key| *current_last_key > *last_key);
//
// if can_delete {
// if let Err(e) = committed_state.delete_messages(*shard, current_last_key) {
// tracing::error!(target: tracing_targets::MQ, "failed to delete messages: {e:?}");
// }
//
// let labels = [("workchain", shard.workchain().to_string())];
// metrics::gauge!("tycho_internal_queue_processed_upto", &labels)
// .set(current_last_key.lt as f64);
//
// gc_state.insert(*shard, *current_last_key);
// }
// }
}

type GcRange = HashMap<ShardIdent, QueueKey>;
11 changes: 6 additions & 5 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use anyhow::Result;
use everscale_types::models::ShardIdent;
use tycho_block_util::queue::QueueKey;
use tycho_block_util::queue::{QueueKey, QueuePartition};
use tycho_util::FastHashMap;

use crate::internal_queue::state::state_iterator::{IterRange, MessageExt};
Expand Down Expand Up @@ -35,6 +35,7 @@ pub trait QueueIterator<V: InternalMessageValue>: Send {

pub struct QueueIteratorImpl<V: InternalMessageValue> {
for_shard: ShardIdent,
// partition: QueuePartition,
messages_for_current_shard: BinaryHeap<Reverse<MessageExt<V>>>,
new_messages: BTreeMap<QueueKey, Arc<V>>,
iterators_manager: StatesIteratorsManager<V>,
Expand Down Expand Up @@ -128,9 +129,9 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {
let mut diff = QueueDiffWithMessages::new();

// fill processed_upto
for (shard_id, message_key) in self.last_processed_message.iter() {
for (shard_ident, message_key) in self.last_processed_message.iter() {
// TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator
diff.processed_to.insert(*shard_id, *message_key);
diff.processed_to.insert(*shard_ident, *message_key);
}

// move new messages
Expand Down Expand Up @@ -160,9 +161,9 @@ impl<V: InternalMessageValue> QueueIterator<V> for QueueIteratorImpl<V> {
// actually we update last processed message via commit()
// during the execution, so we can just use value as is

for (shard_id, message_key) in self.last_processed_message.iter() {
for (shard_ident, message_key) in self.last_processed_message.iter() {
// TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator
diff.processed_to.insert(*shard_id, *message_key);
diff.processed_to.insert(*shard_ident, *message_key);
}

diff.messages = self.new_messages.clone();
Expand Down
Loading

0 comments on commit 5e7756d

Please sign in to comment.