diff --git a/block-util/src/queue/mod.rs b/block-util/src/queue/mod.rs index fc979c97b..e3ffd052a 100644 --- a/block-util/src/queue/mod.rs +++ b/block-util/src/queue/mod.rs @@ -1,4 +1,6 @@ -pub use self::proto::{QueueDiff, QueueKey, QueueState, QueueStateHeader, QueueStateRef}; +pub use self::proto::{ + QueueDiff, QueueKey, QueuePartition, QueueState, QueueStateHeader, QueueStateRef, +}; pub use self::queue_diff::{ QueueDiffMessagesIter, QueueDiffStuff, QueueDiffStuffAug, SerializedQueueDiff, }; diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index 3678cf0d0..8fc6eee4d 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -4,6 +4,7 @@ use bytes::Bytes; use everscale_types::models::*; use everscale_types::prelude::*; use tl_proto::{TlRead, TlWrite}; +use tycho_util::FastHashMap; use crate::tl; @@ -30,6 +31,8 @@ pub struct QueueDiff { pub max_message: QueueKey, /// List of message hashes (sorted ASC). pub messages: Vec, + /// Partition router + pub partition_router: FastHashMap, } impl QueueDiff { @@ -94,6 +97,8 @@ impl<'tl> TlRead<'tl> for QueueDiff { min_message: QueueKey::read_from(data)?, max_message: QueueKey::read_from(data)?, messages: messages_list::read(data)?, + // TODO !!! add read for partition_router + partition_router: Default::default(), }; if result.max_message < result.min_message { @@ -147,13 +152,39 @@ pub struct QueueStateHeader { } /// Queue key. -#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)] +#[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)] pub struct QueueKey { pub lt: u64, #[tl(with = "tl::hash_bytes")] pub hash: HashBytes, } +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, TlWrite, TlRead)] +#[tl(boxed)] +pub enum QueuePartition { + #[tl(id = 0)] + NormalPriority = 0, + #[tl(id = 1)] + LowPriority = 1, +} + +impl Default for QueuePartition { + fn default() -> Self { + Self::NormalPriority + } +} + +impl From for QueuePartition { + fn from(value: u8) -> Self { + match value { + 0 => Self::NormalPriority, + 1 => Self::LowPriority, + _ => panic!("Invalid value for QueuePartition"), + } + } +} + impl QueueKey { const SIZE_HINT: usize = 8 + 32; @@ -201,6 +232,12 @@ impl From for (u64, HashBytes) { } } +impl std::fmt::Debug for QueueKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self) + } +} + impl std::fmt::Display for QueueKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "LT_HASH({}_{})", self.lt, self.hash) @@ -470,6 +507,7 @@ mod tests { HashBytes::from([0x02; 32]), HashBytes::from([0x03; 32]), ], + partition_router: Default::default(), }; let bytes = tl_proto::serialize(&diff); @@ -516,6 +554,7 @@ mod tests { HashBytes::from([0x02; 32]), HashBytes::from([0x03; 32]), ], + partition_router: Default::default(), }; // NOTE: We need this for the hash computation. diff --git a/block-util/src/queue/queue_diff.rs b/block-util/src/queue/queue_diff.rs index c8ae54bab..d776727ef 100644 --- a/block-util/src/queue/queue_diff.rs +++ b/block-util/src/queue/queue_diff.rs @@ -8,7 +8,7 @@ use everscale_types::prelude::*; use tl_proto::TlRead; use crate::archive::WithArchiveData; -use crate::queue::proto::{QueueDiff, QueueKey}; +use crate::queue::proto::{QueueDiff, QueueKey, QueuePartition}; pub type QueueDiffStuffAug = WithArchiveData; @@ -104,6 +104,7 @@ impl QueueDiffStuff { min_message: QueueKey::MIN, max_message: QueueKey::MIN, messages: Vec::new(), + partition_router: Default::default(), }, }), } @@ -136,6 +137,7 @@ impl QueueDiffStuff { min_message: Default::default(), max_message: Default::default(), messages: Default::default(), + partition_router: Default::default(), }, }), } @@ -369,6 +371,7 @@ mod tests { hash: message_hashes[9], }, messages: message_hashes.clone(), + partition_router: Default::default(), }, }), }; diff --git a/collator/src/collator/debug_info.rs b/collator/src/collator/debug_info.rs index 83a17d818..f5cbec7e9 100644 --- a/collator/src/collator/debug_info.rs +++ b/collator/src/collator/debug_info.rs @@ -5,8 +5,8 @@ use everscale_types::models::{ }; use tycho_util::FastHashMap; -use crate::types::DebugDisplay; use crate::types::processed_upto::ProcessedUptoInfoStuff; +use crate::types::DebugDisplay; pub struct BlockDebugInfo<'a> { pub block_id: &'a BlockId, diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index a9b4c21c8..653e3e09b 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -129,11 +129,12 @@ impl Phase { &labels, ); + let statistics = (queue_diff_with_msgs.clone(), block_id_short.shard).into(); mq_adapter.apply_diff( queue_diff_with_msgs, block_id_short, &queue_diff_hash, - max_message, + statistics, )?; let apply_queue_diff_elapsed = histogram.finish(); diff --git a/collator/src/collator/messages_reader/internals_reader.rs b/collator/src/collator/messages_reader/internals_reader.rs index 4a1e03592..0e1d2eaaa 100644 --- a/collator/src/collator/messages_reader/internals_reader.rs +++ b/collator/src/collator/messages_reader/internals_reader.rs @@ -4,13 +4,12 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use everscale_types::models::{MsgInfo, ShardIdent}; use tycho_block_util::queue::QueueKey; -use tycho_util::FastHashMap; use super::super::messages_buffer::{MessagesBufferLimits, MessagesBufferV2}; use super::super::types::{Dequeued, ParsedMessage}; use super::{InternalsRangeReaderState, PartitionReaderState, ShardReaderState}; use crate::internal_queue::iterator::{IterItem, QueueIterator}; -use crate::internal_queue::types::EnqueuedMessage; +use crate::internal_queue::types::{EnqueuedMessage, QueueShardRange}; use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::processed_upto::{BlockSeqno, Lt, PartitionId}; @@ -495,18 +494,22 @@ impl InternalsRangeReader { fn init(&mut self) -> Result<()> { // do not init iterator if range is fully read if !self.fully_read { - let mut ranges_from = FastHashMap::default(); - let mut ranges_to = FastHashMap::default(); + let mut ranges = vec![]; for (shard_id, shard_reader_state) in &self.reader_state.shards { let shard_range_to = QueueKey::max_for_lt(shard_reader_state.to); - ranges_from.insert(*shard_id, shard_reader_state.current_position); - ranges_to.insert(*shard_id, shard_range_to); + ranges.push(QueueShardRange { + shard_ident: *shard_id, + from: shard_reader_state.current_position, + to: shard_range_to, + }); } - let iterator = - self.mq_adapter - .create_iterator(self.for_shard_id, ranges_from, ranges_to)?; + let iterator = self.mq_adapter.create_iterator( + self.for_shard_id, + self.partition_id.into(), + ranges, + )?; self.iterator_opt = Some(iterator); } diff --git a/collator/src/collator/messages_reader/mod.rs b/collator/src/collator/messages_reader/mod.rs index 391719415..624e3e43c 100644 --- a/collator/src/collator/messages_reader/mod.rs +++ b/collator/src/collator/messages_reader/mod.rs @@ -526,6 +526,7 @@ impl NewMessagesState { queue_diff_with_msgs: QueueDiffWithMessages { messages: Default::default(), processed_to: Default::default(), + partition_router: Default::default(), }, messages_for_current_shard: Default::default(), max_message_key_for_current_shard: QueueKey::MIN, diff --git a/collator/src/collator/tests/execution_manager_tests.rs b/collator/src/collator/tests/execution_manager_tests.rs index 7ee7e6d8c..a0007cbdb 100644 --- a/collator/src/collator/tests/execution_manager_tests.rs +++ b/collator/src/collator/tests/execution_manager_tests.rs @@ -9,7 +9,7 @@ use everscale_types::models::{ BlockId, BlockIdShort, BlockchainConfig, CurrencyCollection, ExternalsProcessedUpto, ShardDescription, ShardIdent, ShardStateUnsplit, ValidatorInfo, }; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_util::FastHashMap; @@ -22,7 +22,8 @@ use super::{ }; use crate::internal_queue::iterator::{IterItem, QueueIterator}; use crate::internal_queue::types::{ - EnqueuedMessage, InternalMessageValue, QueueDiffWithMessages, QueueFullDiff, + EnqueuedMessage, InternalMessageValue, QueueDiffWithMessages, QueueFullDiff, QueueRange, + ShardPartition, }; use crate::queue_adapter::MessageQueueAdapter; use crate::test_utils::try_init_test_tracing; @@ -50,11 +51,7 @@ impl QueueIterator for QueueIteratorTestImpl { unimplemented!() } - fn current_position(&self) -> FastHashMap { - unimplemented!() - } - - fn add_message(&mut self, _message: V) -> Result<()> { + fn current_position(&self) -> FastHashMap { unimplemented!() } @@ -70,9 +67,13 @@ impl QueueIterator for QueueIteratorTestImpl { unimplemented!() } - fn commit(&mut self, _messages: Vec<(ShardIdent, QueueKey)>) -> Result<()> { + fn commit(&mut self, _messages: Vec<(ShardPartition, QueueKey)>) -> Result<()> { Ok(()) } + + fn add_message(&mut self, _partition: QueuePartition, _message: V) -> Result<()> { + unimplemented!() + } } #[derive(Default)] @@ -85,9 +86,8 @@ struct MessageQueueAdapterTestImpl { impl MessageQueueAdapter for MessageQueueAdapterTestImpl { fn create_iterator( &self, - _for_shard_id: ShardIdent, - _shards_from: FastHashMap, - _shards_to: FastHashMap, + for_shard_id: ShardIdent, + ranges: Vec, ) -> Result>> { Ok(Box::new(QueueIteratorTestImpl::default())) } @@ -108,8 +108,9 @@ impl MessageQueueAdapter for MessageQueueA fn add_message_to_iterator( &self, - _iterator: &mut Box>, - _message: V, + iterator: &mut Box>, + partition: QueuePartition, + message: V, ) -> Result<()> { unimplemented!() } @@ -117,7 +118,7 @@ impl MessageQueueAdapter for MessageQueueA fn commit_messages_to_iterator( &self, _iterator: &mut Box>, - _messages: Vec<(ShardIdent, QueueKey)>, + _messages: Vec<(ShardPartition, QueueKey)>, ) -> Result<()> { unimplemented!() } diff --git a/collator/src/internal_queue/gc.rs b/collator/src/internal_queue/gc.rs index 989b37b7c..00318f5df 100644 --- a/collator/src/internal_queue/gc.rs +++ b/collator/src/internal_queue/gc.rs @@ -69,27 +69,27 @@ fn gc_task( committed_state: Arc>, delete_until: HashMap, ) { - let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time"); - - let mut gc_state = gc_state.lock().unwrap(); - - for (shard, current_last_key) in delete_until.iter() { - let can_delete = gc_state - .get(shard) - .map_or(true, |last_key| *current_last_key > *last_key); - - if can_delete { - if let Err(e) = committed_state.delete_messages(*shard, current_last_key) { - tracing::error!(target: tracing_targets::MQ, "failed to delete messages: {e:?}"); - } - - let labels = [("workchain", shard.workchain().to_string())]; - metrics::gauge!("tycho_internal_queue_processed_upto", &labels) - .set(current_last_key.lt as f64); - - gc_state.insert(*shard, *current_last_key); - } - } + // let _histogram = HistogramGuard::begin("tycho_internal_queue_gc_execute_task_time"); + // + // let mut gc_state = gc_state.lock().unwrap(); + // + // for (shard, current_last_key) in delete_until.iter() { + // let can_delete = gc_state + // .get(shard) + // .map_or(true, |last_key| *current_last_key > *last_key); + // + // if can_delete { + // if let Err(e) = committed_state.delete_messages(*shard, current_last_key) { + // tracing::error!(target: tracing_targets::MQ, "failed to delete messages: {e:?}"); + // } + // + // let labels = [("workchain", shard.workchain().to_string())]; + // metrics::gauge!("tycho_internal_queue_processed_upto", &labels) + // .set(current_last_key.lt as f64); + // + // gc_state.insert(*shard, *current_last_key); + // } + // } } type GcRange = HashMap; diff --git a/collator/src/internal_queue/iterator.rs b/collator/src/internal_queue/iterator.rs index b337e5a37..359c709df 100644 --- a/collator/src/internal_queue/iterator.rs +++ b/collator/src/internal_queue/iterator.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::Result; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_util::FastHashMap; use crate::internal_queue::state::state_iterator::{IterRange, MessageExt}; @@ -35,6 +35,7 @@ pub trait QueueIterator: Send { pub struct QueueIteratorImpl { for_shard: ShardIdent, + // partition: QueuePartition, messages_for_current_shard: BinaryHeap>>, new_messages: BTreeMap>, iterators_manager: StatesIteratorsManager, @@ -128,9 +129,9 @@ impl QueueIterator for QueueIteratorImpl { let mut diff = QueueDiffWithMessages::new(); // fill processed_upto - for (shard_id, message_key) in self.last_processed_message.iter() { + for (shard_ident, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_to.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_ident, *message_key); } // move new messages @@ -160,9 +161,9 @@ impl QueueIterator for QueueIteratorImpl { // actually we update last processed message via commit() // during the execution, so we can just use value as is - for (shard_id, message_key) in self.last_processed_message.iter() { + for (shard_ident, message_key) in self.last_processed_message.iter() { // TODO: may be `diff.processed_upto` should be a HashMap and we can consume it from iterator - diff.processed_to.insert(*shard_id, *message_key); + diff.processed_to.insert(*shard_ident, *message_key); } diff.messages = self.new_messages.clone(); diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 53549c03f..2239b9cc4 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::{bail, Result}; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockIdShort, ShardIdent}; +use everscale_types::models::{BlockIdShort, IntAddr, ShardIdent}; use serde::{Deserialize, Serialize}; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_util::{serde_helpers, FastDashMap, FastHashMap}; use crate::internal_queue::gc::GcManager; @@ -18,9 +18,13 @@ use crate::internal_queue::state::state_iterator::StateIterator; use crate::internal_queue::state::uncommitted_state::{ UncommittedState, UncommittedStateFactory, UncommittedStateImplFactory, UncommittedStateStdImpl, }; -use crate::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; +use crate::internal_queue::types::{ + DiffStatistics, InternalMessageValue, PartitionQueueKey, QueueDiffWithMessages, QueueRange, + QueueShardRange, QueueStatistics, +}; use crate::tracing_targets; use crate::types::ProcessedTo; + // FACTORY #[derive(Debug, Serialize, Deserialize)] @@ -72,16 +76,17 @@ where /// Create iterator for specified shard and return it fn iterator( &self, - ranges: &FastHashMap, + partition: QueuePartition, + ranges: Vec, for_shard_id: ShardIdent, - ) -> Vec>>; + ) -> Result>>>; /// Add messages to uncommitted state from `diff.messages` and add diff to the cache fn apply_diff( &self, diff: QueueDiffWithMessages, block_id_short: BlockIdShort, - diff_hash: &HashBytes, - end_key: QueueKey, + hash: &HashBytes, + statistics: DiffStatistics, ) -> Result<()>; /// Move messages from uncommitted state to committed state and update gc ranges fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()>; @@ -91,6 +96,12 @@ where fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; /// removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; + /// load statistics for the given range by accounts + fn load_statistics( + &self, + partition: QueuePartition, + ranges: Vec, + ) -> Result; } // IMPLEMENTATION @@ -146,18 +157,24 @@ where { fn iterator( &self, - ranges: &FastHashMap, + partition: QueuePartition, + ranges: Vec, for_shard_id: ShardIdent, - ) -> Vec>> { + ) -> Result>>> { let snapshot = self.committed_state.snapshot(); - let committed_state_iterator = - self.committed_state - .iterator(&snapshot, for_shard_id, ranges); + + let committed_state_iterator = self.committed_state.iterator( + &snapshot, + for_shard_id, + partition.clone(), + ranges.clone(), + )?; + let uncommitted_state_iterator = self.uncommitted_state - .iterator(&snapshot, for_shard_id, ranges); + .iterator(&snapshot, for_shard_id, partition, ranges)?; - vec![committed_state_iterator, uncommitted_state_iterator] + Ok(vec![committed_state_iterator, uncommitted_state_iterator]) } fn apply_diff( @@ -165,7 +182,7 @@ where diff: QueueDiffWithMessages, block_id_short: BlockIdShort, hash: &HashBytes, - end_key: QueueKey, + statistics: DiffStatistics, ) -> Result<()> { // Get or insert the shard diffs for the given block_id_short.shard let mut shard_diffs = self @@ -181,9 +198,8 @@ where "Duplicate diff with different hash: block_id={}, existing diff_hash={}, new diff_hash={}", block_id_short, shard_diff.hash, hash, ) - } else { - return Ok(()); } + return Ok(()); } let last_applied_seqno = shard_diffs.keys().next_back().cloned(); @@ -204,15 +220,26 @@ where } } + let max_message = diff + .messages + .keys() + .next_back() + .cloned() + .unwrap_or_default(); + // Add messages to uncommitted_state if there are any if !diff.messages.is_empty() { - self.uncommitted_state - .add_messages(block_id_short.shard, &diff.messages)?; + self.uncommitted_state.add_messages_with_statistics( + block_id_short.shard, + &diff.partition_router, + &diff.messages, + statistics, + )?; } let short_diff = ShortQueueDiff { processed_to: diff.processed_to, - end_key, + end_key: max_message, hash: *hash, }; @@ -224,8 +251,10 @@ where fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> { let mut shards_to_commit = FastHashMap::default(); + #[cfg(FALSE)] let mut gc_ranges = FastHashMap::default(); + #[cfg(FALSE)] for (block_id_short, top_shard_block_changed) in mc_top_blocks { let mut diffs_to_commit = vec![]; @@ -280,6 +309,7 @@ where metrics::counter!("tycho_internal_queue_uncommitted_diffs_count") .increment(uncommitted_diffs_count as u64); + #[cfg(FALSE)] for (shard, end_key) in gc_ranges { self.gc.update_delete_until(shard, end_key); } @@ -288,6 +318,7 @@ where } fn clear_uncommitted_state(&self) -> Result<()> { + self.uncommitted_state.truncate()?; let diffs_before_clear: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); self.uncommitted_diffs.clear(); @@ -298,7 +329,7 @@ where diffs_after_clear, "Cleared uncommitted diffs.", ); - self.uncommitted_state.truncate() + Ok(()) } fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { @@ -327,4 +358,22 @@ where } Ok(()) } + + fn load_statistics( + &self, + partition: QueuePartition, + ranges: Vec, + ) -> Result { + let snapshot = self.committed_state.snapshot(); + let mut statistics = FastHashMap::default(); + + self.committed_state + .load_statistics(&mut statistics, &snapshot, partition, &ranges)?; + self.uncommitted_state + .load_statistics(&mut statistics, &snapshot, partition, &ranges)?; + + let statistics = QueueStatistics::new_with_statistics(statistics); + + Ok(statistics) + } } diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index cb7533b6c..3aee98279 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,6 +1,8 @@ use ahash::HashMapExt; -use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use anyhow::Result; +use everscale_types::models::{IntAddr, ShardIdent}; +use tycho_block_util::queue::{QueueKey, QueuePartition}; +use tycho_storage::model::StatKey; use tycho_storage::Storage; use tycho_util::FastHashMap; use weedb::OwnedSnapshot; @@ -8,8 +10,8 @@ use weedb::OwnedSnapshot; use crate::internal_queue::state::state_iterator::{ ShardIteratorWithRange, StateIterator, StateIteratorImpl, }; -use crate::internal_queue::types::InternalMessageValue; - +use crate::internal_queue::types::{InternalMessageValue, QueueRange, QueueShardRange}; +use crate::types::processed_upto::PartitionId; // CONFIG pub struct CommittedStateConfig { @@ -64,10 +66,18 @@ pub trait CommittedState: Send + Sync { &self, snapshot: &OwnedSnapshot, receiver: ShardIdent, - ranges: &FastHashMap, - ) -> Box>; + partition: QueuePartition, + ranges: Vec, + ) -> Result>>; - fn delete_messages(&self, shard: ShardIdent, key: &QueueKey) -> anyhow::Result<()>; + fn delete_messages(&self, range: QueueRange) -> anyhow::Result<()>; + fn load_statistics( + &self, + result: &mut FastHashMap, + snapshot: &OwnedSnapshot, + partition: QueuePartition, + range: &Vec, + ) -> Result<()>; } // IMPLEMENTATION @@ -91,26 +101,55 @@ impl CommittedState for CommittedStateStdImpl { &self, snapshot: &OwnedSnapshot, receiver: ShardIdent, - ranges: &FastHashMap, - ) -> Box> { - let mut shard_iters_with_ranges = FastHashMap::with_capacity(ranges.len()); + partition: QueuePartition, + ranges: Vec, + ) -> Result>> { + let mut shard_iters_with_ranges = Vec::new(); - for (&shard, range) in ranges { + for range in ranges { let iter = self .storage .internal_queue_storage() .build_iterator_committed(snapshot); - shard_iters_with_ranges - .insert(shard, ShardIteratorWithRange::new(iter, range.0, range.1)); + shard_iters_with_ranges.push((iter, range)); } - Box::new(StateIteratorImpl::new(shard_iters_with_ranges, receiver)) + let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?; + Ok(Box::new(iterator)) } - fn delete_messages(&self, shard: ShardIdent, until: &QueueKey) -> anyhow::Result<()> { + fn delete_messages(&self, range: QueueRange) -> anyhow::Result<()> { self.storage .internal_queue_storage() - .delete_messages(shard, &QueueKey::MIN, until) + .delete_messages(tycho_storage::model::QueueRange { + partition: range.partition, + shard_ident: range.shard_ident, + from: range.from, + to: range.to, + }) + } + + fn load_statistics( + &self, + result: &mut FastHashMap, + snapshot: &OwnedSnapshot, + partition: QueuePartition, + ranges: &Vec, + ) -> Result<()> { + for range in ranges { + self.storage + .internal_queue_storage() + .collect_commited_stats_in_range( + &snapshot, + range.shard_ident, + partition, + range.from, + range.to, + result, + )?; + } + + Ok(()) } } diff --git a/collator/src/internal_queue/state/shard_iterator.rs b/collator/src/internal_queue/state/shard_iterator.rs index aa8a1d182..793ac72b5 100644 --- a/collator/src/internal_queue/state/shard_iterator.rs +++ b/collator/src/internal_queue/state/shard_iterator.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_storage::model::ShardsInternalMessagesKey; use tycho_storage::owned_iterator::OwnedIterator; @@ -18,10 +18,12 @@ impl Range { } } -impl From<(QueueKey, QueueKey, ShardIdent)> for Range { - fn from(value: (QueueKey, QueueKey, ShardIdent)) -> Self { - let from = ShardsInternalMessagesKey::new(value.2, value.0); - let to = ShardsInternalMessagesKey::new(value.2, value.1); +impl From<(QueuePartition, ShardIdent, QueueKey, QueueKey)> for Range { + fn from(value: (QueuePartition, ShardIdent, QueueKey, QueueKey)) -> Self { + let (partition, shard_ident, from, to) = value; + + let from = ShardsInternalMessagesKey::new(partition, shard_ident, from); + let to = ShardsInternalMessagesKey::new(partition, shard_ident, to); Range { from, to } } @@ -29,32 +31,35 @@ impl From<(QueueKey, QueueKey, ShardIdent)> for Range { pub enum IterResult<'a> { Value(&'a [u8]), - Skip(Option), + Skip(Option<(ShardIdent, QueueKey)>), } pub struct ShardIterator { + partition: QueuePartition, + shard_ident: ShardIdent, range: Range, - source: ShardIdent, receiver: ShardIdent, iterator: OwnedIterator, } impl ShardIterator { pub fn new( - source: ShardIdent, + partition: QueuePartition, + shard_ident: ShardIdent, from: QueueKey, to: QueueKey, receiver: ShardIdent, mut iterator: OwnedIterator, ) -> Self { - iterator.seek(ShardsInternalMessagesKey::new(source, from)); + iterator.seek(ShardsInternalMessagesKey::new(partition, shard_ident, from)); - let range = Range::from((from, to, source)); + let range = Range::from((partition, shard_ident, from, to)); Self { + partition, + shard_ident, range, receiver, - source, iterator, } } @@ -67,9 +72,9 @@ impl ShardIterator { if let Some(key) = self.iterator.key() { let key = ShardsInternalMessagesKey::from(key); - if key.shard_ident != self.source { - return Ok(None); - } + // if key.shard_ident != self.source { + // return Ok(None); + // } if key == self.range.from { return Ok(Some(IterResult::Skip(None))); @@ -91,7 +96,10 @@ impl ShardIterator { return if self.receiver.contains_address(&short_addr) { Ok(Some(IterResult::Value(&value[9..]))) } else { - Ok(Some(IterResult::Skip(Some(key)))) + Ok(Some(IterResult::Skip(Some(( + key.shard_ident, + key.internal_message_key, + ))))) }; } Ok(None) diff --git a/collator/src/internal_queue/state/state_iterator.rs b/collator/src/internal_queue/state/state_iterator.rs index 88af72543..f9ded0703 100644 --- a/collator/src/internal_queue/state/state_iterator.rs +++ b/collator/src/internal_queue/state/state_iterator.rs @@ -1,16 +1,17 @@ use std::cmp::{Ordering, Reverse}; +use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashSet}; use std::sync::Arc; use ahash::HashMapExt; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_storage::owned_iterator::OwnedIterator; use tycho_util::FastHashMap; use crate::internal_queue::state::shard_iterator::{IterResult, ShardIterator}; -use crate::internal_queue::types::InternalMessageValue; +use crate::internal_queue::types::{InternalMessageValue, QueueRange, QueueShardRange}; pub struct ShardIteratorWithRange { pub iter: OwnedIterator, @@ -76,41 +77,52 @@ pub struct StateIteratorImpl { message_queue: BinaryHeap>>, in_queue: HashSet, current_position: FastHashMap, - shards_to_remove: Vec, + iters_to_remove: Vec, + partition: QueuePartition, } impl StateIteratorImpl { pub fn new( - shard_iters_with_ranges: FastHashMap, + partition: QueuePartition, + shard_iters_with_ranges: Vec<(OwnedIterator, QueueShardRange)>, receiver: ShardIdent, - ) -> Self { + ) -> Result { let mut iters = FastHashMap::with_capacity(shard_iters_with_ranges.len()); - for (shard_ident, shard_iter_with_range) in shard_iters_with_ranges { - let shard_iterator = ShardIterator::new( + for (iter, range) in shard_iters_with_ranges { + let QueueShardRange { shard_ident, - shard_iter_with_range.range_start, - shard_iter_with_range.range_end, - receiver, - shard_iter_with_range.iter, - ); + from, + to, + } = range; - iters.insert(shard_ident, shard_iterator); + let shard_iterator = + ShardIterator::new(partition, shard_ident, from, to, receiver, iter); + + match iters.entry(shard_ident) { + Entry::Occupied(_) => { + bail!("Iterator already exists for shard {:?}", shard_ident); + } + Entry::Vacant(entry) => { + entry.insert(shard_iterator); + } + } } - Self { + Ok(Self { iters, + partition, message_queue: BinaryHeap::new(), in_queue: HashSet::new(), current_position: Default::default(), - shards_to_remove: Vec::new(), - } + iters_to_remove: Vec::new(), + }) } fn refill_queue(&mut self) -> Result<()> { - self.shards_to_remove.clear(); + self.iters_to_remove.clear(); - for (&shard_ident, iter) in &mut self.iters { + for (shard_ident, iter) in &mut self.iters { if self.in_queue.contains(&shard_ident) { continue; } @@ -120,30 +132,31 @@ impl StateIteratorImpl { Some(IterResult::Value(value)) => { let message = V::deserialize(value).context("Failed to deserialize message")?; - let message_ext = MessageExt::new(shard_ident, Arc::new(message)); + + let message_ext = MessageExt::new(*shard_ident, Arc::new(message)); + self.message_queue.push(Reverse(message_ext)); - self.in_queue.insert(shard_ident); + self.in_queue.insert(shard_ident.clone()); iter.shift(); break; } - Some(IterResult::Skip(Some(key))) => { - self.current_position - .insert(key.shard_ident, key.internal_message_key); + Some(IterResult::Skip(Some((shard_partition, queue_key)))) => { + self.current_position.insert(shard_partition, queue_key); iter.shift(); } Some(IterResult::Skip(None)) => { iter.shift(); } None => { - self.shards_to_remove.push(shard_ident); + self.iters_to_remove.push(shard_ident.clone()); break; } } } } - for &shard_ident in &self.shards_to_remove { - self.iters.remove(&shard_ident); + for key in &self.iters_to_remove { + self.iters.remove(key); } Ok(()) @@ -156,7 +169,8 @@ impl StateIterator for StateIteratorImpl { if let Some(Reverse(message)) = self.message_queue.pop() { let message_key = message.message.key(); - self.current_position.insert(message.source, message_key); + self.current_position + .insert(message.source.clone(), message_key); self.in_queue.remove(&message.source); return Ok(Some(message)); diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index dfdc29bfa..b3d40f624 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -1,19 +1,25 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use ahash::HashMapExt; use anyhow::Result; -use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use everscale_types::cell::{Cell, CellBuilder, CellFamily, Store}; +use everscale_types::models::{IntAddr, ShardIdent}; +use everscale_types::prelude::Boc; +use serde::Serialize; +use tycho_block_util::queue::{QueueKey, QueuePartition}; +use tycho_storage::model::StatKey; use tycho_storage::Storage; -use tycho_util::FastHashMap; +use tycho_util::{FastHashMap, FastHashSet}; use weedb::rocksdb::WriteBatch; use weedb::OwnedSnapshot; use crate::internal_queue::state::state_iterator::{ ShardIteratorWithRange, StateIterator, StateIteratorImpl, }; -use crate::internal_queue::types::InternalMessageValue; +use crate::internal_queue::types::{ + DiffStatistics, InternalMessageValue, QueueRange, QueueShardRange, QueueStatistics, +}; // CONFIG @@ -64,18 +70,31 @@ pub trait UncommittedStateFactory { #[trait_variant::make(UncommittedState: Send)] pub trait LocalUncommittedState { - fn add_messages(&self, source: ShardIdent, messages: &BTreeMap>) - -> Result<()>; + fn add_messages_with_statistics( + &self, + source: ShardIdent, + partition_router: &FastHashMap, + messages: &BTreeMap>, + statistics: DiffStatistics, + ) -> Result<()>; fn iterator( &self, snapshot: &OwnedSnapshot, receiver: ShardIdent, - ranges: &FastHashMap, - ) -> Box>; + partition: QueuePartition, + ranges: Vec, + ) -> Result>>; fn commit_messages(&self, ranges: &FastHashMap) -> Result<()>; fn truncate(&self) -> Result<()>; + fn load_statistics( + &self, + result: &mut FastHashMap, + snapshot: &OwnedSnapshot, + partition: QueuePartition, + ranges: &Vec, + ) -> Result<()>; } // IMPLEMENTATION @@ -91,30 +110,19 @@ impl UncommittedStateStdImpl { } impl UncommittedState for UncommittedStateStdImpl { - /// write new messages to storage - fn add_messages( + fn add_messages_with_statistics( &self, source: ShardIdent, + partition_router: &FastHashMap, messages: &BTreeMap>, + statistics: DiffStatistics, ) -> Result<()> { let mut batch = WriteBatch::default(); - for (internal_message_key, message) in messages.iter() { - self.storage - .internal_queue_storage() - .insert_message_uncommitted( - &mut batch, - tycho_storage::model::ShardsInternalMessagesKey::new( - source, - *internal_message_key, - ), - message.destination(), - &message.serialize()?, - )?; - } + self.add_messages(&mut batch, source, partition_router, messages)?; + self.add_statistics(&mut batch, statistics)?; self.storage.internal_queue_storage().write_batch(batch)?; - Ok(()) } @@ -122,25 +130,30 @@ impl UncommittedState for UncommittedStateStdImpl { &self, snapshot: &OwnedSnapshot, receiver: ShardIdent, - ranges: &FastHashMap, - ) -> Box> { - let mut shard_iters_with_ranges = FastHashMap::with_capacity(ranges.len()); + partition: QueuePartition, + ranges: Vec, + ) -> Result>> { + let mut shard_iters_with_ranges = Vec::new(); - for (&shard, (start, end)) in ranges { + for range in ranges { let iter = self .storage .internal_queue_storage() .build_iterator_uncommitted(snapshot); - shard_iters_with_ranges.insert(shard, ShardIteratorWithRange::new(iter, *start, *end)); + shard_iters_with_ranges.push((iter, range)); } - Box::new(StateIteratorImpl::new(shard_iters_with_ranges, receiver)) + let iterator = StateIteratorImpl::new(partition, shard_iters_with_ranges, receiver)?; + Ok(Box::new(iterator)) } fn commit_messages(&self, ranges: &FastHashMap) -> Result<()> { + #[cfg(FALSE)] let ranges = ranges.iter().map(|(shard, key)| (*shard, *key)).collect(); - self.storage.internal_queue_storage().commit(ranges) + #[cfg(FALSE)] + self.storage.internal_queue_storage().commit(ranges); + Ok(()) } fn truncate(&self) -> Result<()> { @@ -148,4 +161,101 @@ impl UncommittedState for UncommittedStateStdImpl { .internal_queue_storage() .clear_uncommitted_queue() } + + fn load_statistics( + &self, + result: &mut FastHashMap, + snapshot: &OwnedSnapshot, + partition: QueuePartition, + ranges: &Vec, + ) -> Result<()> { + for range in ranges { + self.storage + .internal_queue_storage() + .collect_uncommited_stats_in_range( + &snapshot, + range.shard_ident, + partition, + range.from, + range.to, + result, + )?; + } + + Ok(()) + } +} + +impl UncommittedStateStdImpl { + /// write new messages to storage + fn add_messages( + &self, + batch: &mut WriteBatch, + source: ShardIdent, + partition_router: &FastHashMap, + messages: &BTreeMap>, + ) -> Result<()> { + for (internal_message_key, message) in messages { + let destination = message.destination(); + + let partition = partition_router + .get(&destination) + .unwrap_or(&QueuePartition::default()) + .clone(); + + self.storage + .internal_queue_storage() + .insert_message_uncommitted( + batch, + tycho_storage::model::ShardsInternalMessagesKey::new( + partition, + source.clone(), + *internal_message_key, + ), + destination, + &message.serialize()?, + )?; + } + + Ok(()) + } + + fn add_statistics( + &self, + batch: &mut WriteBatch, + diff_statistics: DiffStatistics, + ) -> Result<()> { + let shard_ident = diff_statistics.shard_ident(); + let min_message = diff_statistics.min_message(); + let max_message = diff_statistics.max_message(); + + for (index, (partition, values)) in diff_statistics.iter().enumerate() { + let cx = &mut Cell::empty_context(); + + for value in values { + let mut key_builder = CellBuilder::new(); + + let (addr, count) = value; + + addr.store_into(&mut key_builder, cx)?; + let dest = key_builder.build()?; + + let dest = Boc::encode(dest); + + let key = StatKey { + shard_ident: *shard_ident, + partition: partition.clone(), + min_message: min_message.clone(), + max_message: max_message.clone(), + index: index as u64, + }; + + self.storage + .internal_queue_storage() + .insert_destination_stat_uncommitted(batch, &key, &dest, *count)?; + } + } + + Ok(()) + } } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index db6ad2942..86cef55b5 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use anyhow::{bail, Context, Result}; use everscale_types::boc::Boc; use everscale_types::cell::{Cell, HashBytes, Load}; -use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr}; -use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey}; +use everscale_types::models::{IntAddr, IntMsgInfo, Message, MsgInfo, OutMsgDescr, ShardIdent}; +use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey, QueuePartition}; +use tycho_util::FastHashMap; use super::state::state_iterator::MessageExt; use crate::types::ProcessedTo; @@ -15,6 +16,7 @@ use crate::types::ProcessedTo; pub struct QueueDiffWithMessages { pub messages: BTreeMap>, pub processed_to: ProcessedTo, + pub partition_router: FastHashMap, } impl QueueDiffWithMessages { @@ -22,6 +24,7 @@ impl QueueDiffWithMessages { Self { messages: BTreeMap::new(), processed_to: BTreeMap::new(), + partition_router: Default::default(), } } } @@ -32,7 +35,7 @@ impl QueueDiffWithMessages { out_msg_description: &OutMsgDescr, ) -> Result { let QueueDiff { processed_to, .. } = queue_diff_stuff.as_ref(); - let processed_to = processed_to + let processed_to: BTreeMap = processed_to .iter() .map(|(shard_ident, key)| (*shard_ident, *key)) .collect(); @@ -53,6 +56,7 @@ impl QueueDiffWithMessages { Ok(Self { messages, processed_to, + partition_router: Default::default(), }) } } @@ -172,3 +176,125 @@ impl InternalMessageValue for EnqueuedMessage { self.key() } } + +pub struct PartitionQueueKey { + pub partition: QueuePartition, + pub key: QueueKey, +} +#[derive(Debug, Clone)] +pub struct QueueShardRange { + pub shard_ident: ShardIdent, + pub from: QueueKey, + pub to: QueueKey, +} + +#[derive(Debug, Clone)] +pub struct QueueRange { + pub partition: QueuePartition, + pub shard_ident: ShardIdent, + pub from: QueueKey, + pub to: QueueKey, +} + +pub struct QueueStatistics { + statistics: FastHashMap, +} + +impl QueueStatistics { + pub fn new() -> Self { + Self { + statistics: Default::default(), + } + } + pub fn new_with_statistics(statistics: FastHashMap) -> Self { + Self { statistics } + } + + pub fn show(&self) -> FastHashMap { + self.statistics.clone() + } + + pub fn append(&mut self, other: &Self) { + for (key, value) in other.statistics.iter() { + *self.statistics.entry(key.clone()).or_insert(0) += *value; + } + } + + pub fn apply_diff_statistics(&mut self, diff_statistics: DiffStatistics) { + let diff_statistics = diff_statistics.inner.statistics.clone(); + for (_, values) in diff_statistics.iter() { + for value in values.iter() { + *self.statistics.entry(value.0.clone()).or_insert(0) += *value.1; + } + } + } +} + +pub struct DiffStatistics { + inner: Arc, +} + +impl DiffStatistics { + pub fn iter(&self) -> impl Iterator)> { + self.inner.statistics.iter() + } + + pub fn shard_ident(&self) -> &ShardIdent { + &self.inner.shard_ident + } + + pub fn min_message(&self) -> &QueueKey { + &self.inner.min_message + } + + pub fn max_message(&self) -> &QueueKey { + &self.inner.max_message + } +} + +struct DiffStatisticsInner { + shard_ident: ShardIdent, + min_message: QueueKey, + max_message: QueueKey, + statistics: FastHashMap>, +} + +impl From<(QueueDiffWithMessages, ShardIdent)> for DiffStatistics { + fn from(value: (QueueDiffWithMessages, ShardIdent)) -> Self { + let (diff, shard_ident) = value; + let min_message = diff.messages.keys().next().cloned().unwrap_or_default(); + let max_message = diff + .messages + .keys() + .next_back() + .cloned() + .unwrap_or_default(); + + let mut statistics = FastHashMap::default(); + + for (_, message) in diff.messages { + let destination = message.destination(); + + let partition = diff + .partition_router + .get(&destination) + .unwrap_or(&QueuePartition::default()) + .clone(); + + *statistics + .entry(partition) + .or_insert(FastHashMap::default()) + .entry(destination.clone()) + .or_insert(0) += 1; + } + + Self { + inner: Arc::new(DiffStatisticsInner { + shard_ident, + min_message, + max_message, + statistics, + }), + } + } +} diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 2b7330ef2..5a5068fe3 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1,4 +1,4 @@ -use std::collections::{hash_map, BTreeMap, VecDeque}; +use std::collections::{hash_map, VecDeque}; use std::sync::Arc; use ahash::HashMapExt; @@ -27,7 +27,7 @@ use crate::collator::{ CollationCancelReason, Collator, CollatorContext, CollatorEventListener, CollatorFactory, ForceMasterCollation, }; -use crate::internal_queue::types::{EnqueuedMessage, QueueDiffWithMessages}; +use crate::internal_queue::types::{DiffStatistics, EnqueuedMessage, QueueDiffWithMessages}; use crate::manager::types::BlockCacheStoreResult; use crate::mempool::{ MempoolAdapter, MempoolAdapterFactory, MempoolAnchor, MempoolAnchorId, MempoolEventListener, @@ -472,11 +472,14 @@ where let queue_diff_with_msgs = QueueDiffWithMessages::from_queue_diff(queue_diff, &out_msgs.load()?)?; + + let statistics = (queue_diff_with_msgs.clone(), queue_diff.block_id().shard).into(); + mq_adapter.apply_diff( queue_diff_with_msgs, queue_diff.block_id().as_short_id(), queue_diff.diff_hash(), - queue_diff.diff().max_message, + statistics, ) } @@ -1189,10 +1192,11 @@ where .await?; // calc internals processed upto - let mut min_processed_to_by_shards = BTreeMap::default(); + let mut min_processed_to_by_shards = ProcessedTo::default(); + // find min processed to by shards for trim tail for min_processed_upto in processed_to_by_shards.values() { - for (shard_id, to_key) in min_processed_upto { + for (shard_id, to_key) in min_processed_upto.clone() { min_processed_to_by_shards .entry(shard_id) .and_modify(|min| *min = std::cmp::min(*min, to_key)) @@ -1201,7 +1205,7 @@ where } tracing::debug!(target: tracing_targets::COLLATION_MANAGER, - min_processed_to_by_shards = %DisplayIter(min_processed_to_by_shards.iter().map(DisplayTuple)), + ?min_processed_to_by_shards, ); // find first applied mc block and tail shard blocks and get previous @@ -1299,16 +1303,13 @@ where // apply required previous queue diffs while let Some((diff, diff_hash, block_id, max_message_key)) = prev_queue_diffs.pop() { - self.mq_adapter.apply_diff( - diff, - block_id.as_short_id(), - &diff_hash, - max_message_key, - )?; + let statistics = (diff.clone(), block_id.shard).into(); + self.mq_adapter + .apply_diff(diff, block_id.as_short_id(), &diff_hash, statistics)?; } // trim diffs tails for all shards for (shard_id, min_processed_to) in min_processed_to_by_shards { - self.mq_adapter.trim_diffs(shard_id, min_processed_to)?; + self.mq_adapter.trim_diffs(&shard_id, &min_processed_to)?; } // sync all applied blocks diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 8b6e4d94f..80fbfea3f 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -2,17 +2,20 @@ use anyhow::Result; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockIdShort, ShardIdent}; use tracing::instrument; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_util::FastHashMap; use crate::internal_queue::iterator::{QueueIterator, QueueIteratorExt, QueueIteratorImpl}; use crate::internal_queue::queue::{Queue, QueueImpl}; use crate::internal_queue::state::commited_state::CommittedStateStdImpl; +use crate::internal_queue::state::state_iterator::IterRange; use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager; use crate::internal_queue::state::uncommitted_state::UncommittedStateStdImpl; -use crate::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; +use crate::internal_queue::types::{ + DiffStatistics, InternalMessageValue, QueueDiffWithMessages, QueueRange, QueueShardRange, +}; use crate::tracing_targets; -use crate::types::{DisplayIter, DisplayTuple, DisplayTupleRef}; +use crate::types::{DisplayIter, DisplayNestedMap, DisplayTuple, DisplayTupleRef}; pub struct MessageQueueAdapterStdImpl { queue: QueueImpl, @@ -26,8 +29,8 @@ where fn create_iterator( &self, for_shard_id: ShardIdent, - shards_from: FastHashMap, - shards_to: FastHashMap, + partition: QueuePartition, + ranges: Vec, ) -> Result>>; /// Apply diff to the current queue uncommitted state (waiting for the operation to complete) fn apply_diff( @@ -35,7 +38,7 @@ where diff: QueueDiffWithMessages, block_id_short: BlockIdShort, diff_hash: &HashBytes, - end_key: QueueKey, + statistics: DiffStatistics, ) -> Result<()>; /// Commit previously applied diff, saving changes to committed state (waiting for the operation to complete). @@ -71,29 +74,22 @@ impl MessageQueueAdapterStdImpl { } impl MessageQueueAdapter for MessageQueueAdapterStdImpl { - #[instrument(skip_all, fields(%for_shard_id))] + #[instrument(skip_all, fields(%for_shard_id, ranges = ?ranges))] fn create_iterator( &self, for_shard_id: ShardIdent, - shards_from: FastHashMap, - shards_to: FastHashMap, + partition: QueuePartition, + ranges: Vec, ) -> Result>> { let time_start = std::time::Instant::now(); - let ranges = QueueIteratorExt::collect_ranges(shards_from, shards_to); - - let states_iterators = self.queue.iterator(&ranges, for_shard_id); + let states_iterators = self.queue.iterator(partition, ranges, for_shard_id)?; let states_iterators_manager = StatesIteratorsManager::new(states_iterators); - let iterator = QueueIteratorImpl::new(states_iterators_manager, for_shard_id)?; + tracing::info!( target: tracing_targets::MQ_ADAPTER, - range = %DisplayIter(ranges - .iter() - .map(|(k, v)| DisplayTuple((k, DisplayTupleRef(v)))) - ), elapsed = %humantime::format_duration(time_start.elapsed()), - for_shard_id = %for_shard_id, "Iterator created" ); Ok(Box::new(iterator)) @@ -105,17 +101,18 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI diff: QueueDiffWithMessages, block_id_short: BlockIdShort, hash: &HashBytes, - end_key: QueueKey, + statistics: DiffStatistics, ) -> Result<()> { let time = std::time::Instant::now(); let len = diff.messages.len(); let processed_to = diff.processed_to.clone(); - self.queue.apply_diff(diff, block_id_short, hash, end_key)?; + self.queue + .apply_diff(diff, block_id_short, hash, statistics)?; tracing::info!(target: tracing_targets::MQ_ADAPTER, new_messages_len = len, elapsed = ?time.elapsed(), - processed_to = %DisplayIter(processed_to.iter().map(DisplayTuple)), + processed_to = ?processed_to, "Diff applied", ); Ok(()) @@ -165,8 +162,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { tracing::trace!( target: tracing_targets::MQ_ADAPTER, - source_shard = %source_shard, - inclusive_until = %inclusive_until, + source_shard = ?source_shard, + inclusive_until = ?inclusive_until, "Trimming diffs" ); self.queue.trim_diffs(source_shard, inclusive_until) diff --git a/collator/src/types.rs b/collator/src/types.rs index c1409a013..391449327 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -509,6 +509,8 @@ impl std::fmt::Display for Display } } +pub(super) struct DisplayNestedMap<'a, OK, IK, V>(pub &'a BTreeMap>); + #[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] pub struct ShardDescriptionShort { pub ext_processed_to_anchor_id: u32, diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 5d4137b54..8bb88f72c 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -12,7 +12,7 @@ use everscale_types::models::{ Transaction, TxInfo, }; use everscale_types::num::Tokens; -use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey}; +use tycho_block_util::queue::{QueueDiff, QueueDiffStuff, QueueKey, QueuePartition}; use tycho_collator::internal_queue::queue::{ Queue, QueueConfig, QueueFactory, QueueFactoryStdImpl, QueueImpl, }; @@ -23,7 +23,9 @@ use tycho_collator::internal_queue::state::states_iterators_manager::StatesItera use tycho_collator::internal_queue::state::uncommitted_state::{ UncommittedStateImplFactory, UncommittedStateStdImpl, }; -use tycho_collator::internal_queue::types::{InternalMessageValue, QueueDiffWithMessages}; +use tycho_collator::internal_queue::types::{ + DiffStatistics, InternalMessageValue, QueueDiffWithMessages, QueueRange, QueueShardRange, +}; use tycho_collator::test_utils::prepare_test_storage; use tycho_util::FastHashMap; @@ -139,9 +141,26 @@ async fn test_queue() -> anyhow::Result<()> { .insert(stored_object.key(), stored_object.clone()); } - let end_key = *diff.messages.iter().last().unwrap().0; + let mut partition_router = FastHashMap::default(); + + for stored_object in &stored_objects { + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority); + } + + let diff_with_messages = QueueDiffWithMessages { + messages: diff.messages, + processed_to: diff.processed_to, + partition_router, + }; + + let statistics = (diff_with_messages.clone(), block.shard).into(); - queue.apply_diff(diff, block, &HashBytes::from([1; 32]), end_key)?; + queue.apply_diff( + diff_with_messages, + block, + &HashBytes::from([1; 32]), + statistics, + )?; let top_blocks = vec![(block, true)]; @@ -184,26 +203,46 @@ async fn test_queue() -> anyhow::Result<()> { let top_blocks = vec![(block2, true)]; - let end_key = *diff.messages.iter().last().unwrap().0; - queue.apply_diff(diff, block2, &HashBytes::from([0; 32]), end_key)?; + let mut partition_router = FastHashMap::default(); + + for stored_object in &stored_objects2 { + partition_router.insert(stored_object.dest.clone(), QueuePartition::LowPriority); + } + + let diff_with_messages = QueueDiffWithMessages { + messages: diff.messages, + processed_to: diff.processed_to, + partition_router, + }; + + let statistics = (diff_with_messages.clone(), block.shard).into(); + + queue.apply_diff( + diff_with_messages, + block2, + &HashBytes::from([0; 32]), + statistics, + )?; queue.commit_diff(&top_blocks)?; - let mut ranges = FastHashMap::default(); - ranges.insert( - ShardIdent::new_full(0), - ( - QueueKey { - lt: 1, - hash: HashBytes::default(), - }, - QueueKey { - lt: 4, - hash: HashBytes::default(), - }, - ), - ); + let mut ranges = Vec::new(); + + let queue_range = QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 1, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 4, + hash: HashBytes::default(), + }, + }; + + ranges.push(queue_range); - let iterators = queue.iterator(&ranges, ShardIdent::new_full(-1)); + let partition = QueuePartition::LowPriority; + let iterators = queue.iterator(partition, ranges, ShardIdent::new_full(-1))?; let mut iterator_manager = StatesIteratorsManager::new(iterators); iterator_manager.next().ok(); @@ -256,33 +295,47 @@ async fn test_queue_clear() -> anyhow::Result<()> { .insert(stored_object.key(), stored_object.clone()); } - let end_key = *diff.messages.iter().last().unwrap().0; + let diff_with_messages = QueueDiffWithMessages { + messages: diff.messages, + processed_to: diff.processed_to, + partition_router: Default::default(), + }; - queue.apply_diff(diff, block, &HashBytes::from([1; 32]), end_key)?; + let statistics = (diff_with_messages.clone(), block.shard).into(); - let mut ranges = FastHashMap::default(); - ranges.insert( - ShardIdent::new_full(0), - ( - QueueKey { - lt: 1, - hash: HashBytes::default(), - }, - QueueKey { - lt: 4, - hash: HashBytes::default(), - }, - ), - ); + queue.apply_diff( + diff_with_messages, + block, + &HashBytes::from([1; 32]), + statistics, + )?; - let iterators = queue.iterator(&ranges, ShardIdent::new_full(1)); + let mut ranges = Vec::new(); + + let queue_range = QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: QueueKey { + lt: 1, + hash: HashBytes::default(), + }, + to: QueueKey { + lt: 4, + hash: HashBytes::default(), + }, + }; + + ranges.push(queue_range); + + let partition = QueuePartition::NormalPriority; + + let iterators = queue.iterator(partition, ranges.clone(), ShardIdent::new_full(1))?; let mut iterator_manager = StatesIteratorsManager::new(iterators); assert!(iterator_manager.next().ok().is_some()); queue.clear_uncommitted_state()?; - let iterators = queue.iterator(&ranges, ShardIdent::new_full(1)); + let iterators = queue.iterator(partition, ranges.clone(), ShardIdent::new_full(1))?; let mut iterator_manager = StatesIteratorsManager::new(iterators); assert!(iterator_manager.next()?.is_none()); @@ -290,6 +343,87 @@ async fn test_queue_clear() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_statistics() -> anyhow::Result<()> { + let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); + + let queue_factory = QueueFactoryStdImpl { + uncommitted_state_factory: UncommittedStateImplFactory { + storage: storage.clone(), + }, + committed_state_factory: CommittedStateImplFactory { storage }, + config: QueueConfig { + gc_interval: Duration::from_secs(1), + }, + }; + + let queue: QueueImpl = + queue_factory.create(); + let block = BlockIdShort { + shard: ShardIdent::new_full(0), + seqno: 0, + }; + let mut diff = QueueDiffWithMessages::new(); + + let stored_objects = vec![create_stored_object( + 1, + "1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732", + )?]; + + for stored_object in &stored_objects { + diff.messages + .insert(stored_object.key(), stored_object.clone()); + } + + let start_key = *diff.messages.iter().next().unwrap().0; + let end_key = *diff.messages.iter().last().unwrap().0; + let diff_with_messages = QueueDiffWithMessages { + messages: diff.messages, + processed_to: diff.processed_to, + partition_router: Default::default(), + }; + + let statistics: DiffStatistics = (diff_with_messages.clone(), block.shard).into(); + + for stat in statistics.iter() { + assert_eq!(stat.1.len(), 1); + } + + queue.apply_diff( + diff_with_messages, + block, + &HashBytes::from([1; 32]), + statistics, + )?; + + let partition = QueuePartition::NormalPriority; + + let range = QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: start_key, + to: end_key, + }; + + let ranges = vec![range.clone()]; + + let stat = queue.load_statistics(partition, ranges)?; + + for s in stat.show() { + println!("{:?}", s); + } + + assert_eq!(*stat.show().iter().next().unwrap().1, 1); + + let ranges = vec![range.clone(), range]; + + let stat = queue.load_statistics(partition, ranges)?; + + assert_eq!(*stat.show().iter().next().unwrap().1, 2); + + Ok(()) +} + +#[cfg(FALSE)] #[test] fn test_queue_diff_with_messages_from_queue_diff_stuff() -> anyhow::Result<()> { let mut out_msg = OutMsgDescr::default(); @@ -457,6 +591,7 @@ fn create_dump_msg_envelope(message: Lazy) -> Lazy { .unwrap() } +#[cfg(FALSE)] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_queue_tail() -> anyhow::Result<()> { let (storage, _tmp_dir) = prepare_test_storage().await.unwrap(); diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 4254e0117..bde71185d 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -134,6 +134,8 @@ weedb::tables! { pub block_connections: tables::BlockConnections, pub shards_internal_messages: tables::ShardsInternalMessages, pub shards_internal_messages_uncommitted: tables::ShardsInternalMessagesSession, + pub internal_messages_dest_stat: tables::InternalMessagesDestStat, + pub internal_messages_dest_stat_uncommitted: tables::InternalMessagesDestStatUncommitted, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index d426f0f40..83428951b 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -441,6 +441,36 @@ impl ColumnFamilyOptions for ShardsInternalMessagesSession { } } +pub struct InternalMessagesDestStatUncommitted; +impl ColumnFamily for InternalMessagesDestStatUncommitted { + const NAME: &'static str = "internal_messages_dest_stat_uncommitted"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessagesDestStatUncommitted { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + +pub struct InternalMessagesDestStat; +impl ColumnFamily for InternalMessagesDestStat { + const NAME: &'static str = "internal_messages_dest_stat"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessagesDestStat { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index de9ac92e3..b275a0841 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -1,14 +1,21 @@ +use std::collections::HashMap; use std::fs::File; -use anyhow::Result; +use ahash::RandomState; +use anyhow::{Error, Result}; +use everscale_types::boc::Boc; +use everscale_types::cell::{Cell, CellSlice, Load}; use everscale_types::models::{IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent}; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use tycho_util::FastHashMap; -use weedb::rocksdb::{ReadOptions, WriteBatch}; +use weedb::rocksdb::{ + DBCommon, DBRawIterator, DBRawIteratorWithThreadMode, MultiThreaded, ReadOptions, WriteBatch, + WriteBatchWithTransaction, +}; use weedb::{BoundedCfHandle, OwnedSnapshot}; use crate::db::*; -use crate::model::ShardsInternalMessagesKey; +use crate::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; use crate::store::QueueStateReader; use crate::util::{OwnedIterator, StoredValue}; @@ -20,6 +27,137 @@ pub struct InternalQueueStorage { } impl InternalQueueStorage { + pub fn insert_destination_stat_uncommitted( + &self, + batch: &mut WriteBatchWithTransaction, + key: &StatKey, + dest: &[u8], + count: u64, + ) -> Result<()> { + let cf = self.db.internal_messages_dest_stat_uncommitted.cf(); + let mut key_buffer = Vec::with_capacity(StatKey::SIZE_HINT); + key.serialize(&mut key_buffer); + + let mut value_buffer = Vec::with_capacity(std::mem::size_of::() + dest.len()); + + unsafe { + let count_bytes = count.to_be_bytes(); + let ptr = value_buffer.as_mut_ptr(); + + std::ptr::copy_nonoverlapping(count_bytes.as_ptr(), ptr, count_bytes.len()); + + std::ptr::copy_nonoverlapping(dest.as_ptr(), ptr.add(count_bytes.len()), dest.len()); + + value_buffer.set_len(count_bytes.len() + dest.len()); + } + + batch.put_cf(&cf, &key_buffer, &value_buffer); + + Ok(()) + } + + pub fn collect_commited_stats_in_range( + &self, + snapshot: &OwnedSnapshot, + shard_ident: ShardIdent, + partition: QueuePartition, + from: QueueKey, + to: QueueKey, + result: &mut FastHashMap, + ) -> Result<()> { + let mut read_config = self.db.internal_messages_dest_stat.new_read_config(); + read_config.set_snapshot(snapshot); + let cf = self.db.internal_messages_dest_stat.cf(); + + let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); + + self.collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result) + } + + pub fn collect_uncommited_stats_in_range( + &self, + snapshot: &OwnedSnapshot, + shard_ident: ShardIdent, + partition: QueuePartition, + from: QueueKey, + to: QueueKey, + result: &mut FastHashMap, + ) -> Result<()> { + let mut read_config = self + .db + .internal_messages_dest_stat_uncommitted + .new_read_config(); + read_config.set_snapshot(snapshot); + let cf = self.db.internal_messages_dest_stat_uncommitted.cf(); + + let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); + + self.collect_dest_counts_in_range(&mut iter, shard_ident, partition, from, to, result) + } + + fn collect_dest_counts_in_range( + &self, + iter: &mut DBRawIterator<'_>, + shard_ident: ShardIdent, + partition: QueuePartition, + from: QueueKey, + to: QueueKey, + result: &mut FastHashMap, + ) -> Result<()> { + let from_key = StatKey { + shard_ident, + partition, + min_message: from, + max_message: QueueKey::MIN, + index: 0, + }; + + let from_key_bytes = { + let mut buf = Vec::with_capacity(StatKey::SIZE_HINT); + from_key.serialize(&mut buf); + buf + }; + + iter.seek(&from_key_bytes); + + while iter.valid() { + let key_bytes = iter.key(); + let value_bytes = iter.value(); + + match (key_bytes, value_bytes) { + (Some(mut k), Some(v)) => { + let current_key = StatKey::deserialize(&mut k); + + if current_key.shard_ident != shard_ident || current_key.partition != partition + { + break; + } + + if current_key.max_message > to { + break; + } + + let (count_bytes, dest_bytes) = v.split_at(8); + let count = u64::from_be_bytes(count_bytes.try_into().unwrap()); + + let cell = Boc::decode(dest_bytes)?; + + let int_addr = IntAddr::load_from(&mut cell.as_slice()?)?; + + let entry = result.entry(int_addr).or_insert(0); + *entry += count; + } + _ => { + break; + } + } + + iter.next(); + } + + Ok(()) + } + pub fn new(db: BaseDb) -> Self { Self { db } } @@ -59,6 +197,8 @@ impl InternalQueueStorage { }; let key = ShardsInternalMessagesKey { + // TODO !!! read it + partition: QueuePartition::NormalPriority, shard_ident, internal_message_key: QueueKey { lt: int_msg_info.created_lt, @@ -81,14 +221,10 @@ impl InternalQueueStorage { .await? } - pub fn delete_messages( - &self, - source_shard: ShardIdent, - from: &QueueKey, - to: &QueueKey, - ) -> Result<()> { - let start_key = ShardsInternalMessagesKey::new(source_shard, *from); - let end_key = ShardsInternalMessagesKey::new(source_shard, *to); + pub fn delete_messages(&self, range: QueueRange) -> Result<()> { + let start_key = + ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.from); + let end_key = ShardsInternalMessagesKey::new(range.partition, range.shard_ident, range.to); let shards_internal_messages_cf = self.db.shards_internal_messages.cf(); @@ -110,19 +246,21 @@ impl InternalQueueStorage { Ok(()) } - pub fn commit(&self, ranges: FastHashMap) -> Result<()> { + pub fn commit(&self, ranges: Vec) -> Result<()> { let snapshot = self.snapshot(); let mut batch = WriteBatch::default(); for range in ranges { let from = ShardsInternalMessagesKey { - shard_ident: range.0, - internal_message_key: QueueKey::MIN, + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.from, }; let to = ShardsInternalMessagesKey { - shard_ident: range.0, - internal_message_key: range.1, + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.to, }; let mut readopts = self diff --git a/storage/src/store/internal_queue/model.rs b/storage/src/store/internal_queue/model.rs index e02e184b6..12a6457bc 100644 --- a/storage/src/store/internal_queue/model.rs +++ b/storage/src/store/internal_queue/model.rs @@ -1,18 +1,24 @@ use everscale_types::cell::HashBytes; use everscale_types::models::ShardIdent; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueKey, QueuePartition}; use crate::util::{StoredValue, StoredValueBuffer}; -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct ShardsInternalMessagesKey { pub shard_ident: ShardIdent, + pub partition: QueuePartition, pub internal_message_key: QueueKey, } impl ShardsInternalMessagesKey { - pub fn new(shard_ident: ShardIdent, internal_message_key: QueueKey) -> Self { + pub fn new( + partition: QueuePartition, + shard_ident: ShardIdent, + internal_message_key: QueueKey, + ) -> Self { Self { + partition, shard_ident, internal_message_key, } @@ -32,6 +38,7 @@ impl StoredValue for ShardsInternalMessagesKey { type OnStackSlice = [u8; Self::SIZE_HINT]; fn serialize(&self, buffer: &mut T) { + self.partition.serialize(buffer); self.shard_ident.serialize(buffer); self.internal_message_key.serialize(buffer); } @@ -41,16 +48,50 @@ impl StoredValue for ShardsInternalMessagesKey { panic!("Insufficient data for deserialization") } + let partition = QueuePartition::deserialize(reader); let shard_ident = ShardIdent::deserialize(reader); let internal_message_key = QueueKey::deserialize(reader); Self { + partition, shard_ident, internal_message_key, } } } +impl StoredValue for QueuePartition { + const SIZE_HINT: usize = 1; + + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + let value = match self { + QueuePartition::NormalPriority => 0u8, + QueuePartition::LowPriority => 1u8, + }; + buffer.write_raw_slice(&[value]); + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let value = reader[0]; + *reader = &reader[1..]; + + match value { + 0 => QueuePartition::NormalPriority, + 1 => QueuePartition::LowPriority, + _ => panic!("Unknown value for QueuePartition: {}", value), + } + } +} + impl StoredValue for QueueKey { const SIZE_HINT: usize = 8 + 32; @@ -79,3 +120,81 @@ impl StoredValue for QueueKey { Self { lt, hash } } } + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct StatKey { + pub shard_ident: ShardIdent, + pub partition: QueuePartition, + pub min_message: QueueKey, + pub max_message: QueueKey, + pub index: u64, +} + +impl StatKey { + pub fn new( + shard_ident: ShardIdent, + partition: QueuePartition, + min_message: QueueKey, + max_message: QueueKey, + index: u64, + ) -> Self { + Self { + shard_ident, + partition, + min_message, + max_message, + index, + } + } +} + +impl StoredValue for StatKey { + const SIZE_HINT: usize = ShardIdent::SIZE_HINT + + QueuePartition::SIZE_HINT + + QueueKey::SIZE_HINT * 2 + + std::mem::size_of::(); + + type OnStackSlice = [u8; ShardIdent::SIZE_HINT + + QueuePartition::SIZE_HINT + + QueueKey::SIZE_HINT * 2 + + std::mem::size_of::()]; + + fn serialize(&self, buffer: &mut T) { + self.shard_ident.serialize(buffer); + self.partition.serialize(buffer); + self.min_message.serialize(buffer); + self.max_message.serialize(buffer); + buffer.write_raw_slice(&self.index.to_le_bytes()); + } + + fn deserialize(reader: &mut &[u8]) -> Self { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let shard_ident = ShardIdent::deserialize(reader); + let partition = QueuePartition::deserialize(reader); + let min_message = QueueKey::deserialize(reader); + let max_message = QueueKey::deserialize(reader); + + let mut index_bytes = [0u8; std::mem::size_of::()]; + index_bytes.copy_from_slice(&reader[..std::mem::size_of::()]); + let index = u64::from_le_bytes(index_bytes); + *reader = &reader[std::mem::size_of::()..]; + + Self { + shard_ident, + partition, + min_message, + max_message, + index, + } + } +} + +pub struct QueueRange { + pub shard_ident: ShardIdent, + pub partition: QueuePartition, + pub from: QueueKey, + pub to: QueueKey, +}