From b7b2b34a3b35b7feaba12974a0951601186b753b Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Tue, 22 Oct 2024 19:33:51 +0200 Subject: [PATCH] fix(core): fix `get_block_full` rpc response if block not found (#364) --- Cargo.lock | 4 +- control/src/server.rs | 12 ++-- core/src/blockchain_rpc/service.rs | 87 ++++++++++++++++----------- core/src/proto/blockchain.rs | 20 +++---- core/tests/archives.rs | 4 +- storage/src/store/block/mod.rs | 46 +++++++------- util/src/tl.rs | 96 ++++++++++++++++++++++++++++++ 7 files changed, 194 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46211265f..8b46eca6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3865,9 +3865,9 @@ dependencies = [ [[package]] name = "weedb" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1ec3f3778464cc59970fbf3d655c5e681d40ebe2c0ab531af2a63421dcde648" +checksum = "bbb8343d7de3bbc3a78eb522a63bd00701c7c8daac8d9c96e5aa645901a9ec34" dependencies = [ "librocksdb-sys", "metrics", diff --git a/control/src/server.rs b/control/src/server.rs index 76339c56a..af2cd29f2 100644 --- a/control/src/server.rs +++ b/control/src/server.rs @@ -200,7 +200,7 @@ impl proto::ControlServer for ControlServer { return Ok(proto::BlockResponse::NotFound); }; - let data = blocks.load_block_data_raw(&handle).await?; + let data = blocks.load_block_data_raw(&handle).await?.to_vec(); Ok(proto::BlockResponse::Found { data }) } @@ -216,7 +216,7 @@ impl proto::ControlServer for ControlServer { return Ok(proto::BlockResponse::NotFound); }; - let data = blocks.load_block_proof_raw(&handle).await?; + let data = blocks.load_block_proof_raw(&handle).await?.to_vec(); Ok(proto::BlockResponse::Found { data }) } @@ -232,7 +232,7 @@ impl proto::ControlServer for ControlServer { return Ok(proto::BlockResponse::NotFound); }; - let data = blocks.load_queue_diff_raw(&handle).await?; + let data = blocks.load_queue_diff_raw(&handle).await?.to_vec(); Ok(proto::BlockResponse::Found { data }) } @@ -267,8 +267,10 @@ impl proto::ControlServer for ControlServer { ) -> ServerResult { let blocks = self.inner.storage.block_storage(); - let data = blocks.get_archive_chunk(req.archive_id, req.offset).await?; - + let data = blocks + .get_archive_chunk(req.archive_id, req.offset) + .await? + .to_vec(); Ok(proto::ArchiveSliceResponse { data }) } diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index 2f14963d3..800cbbc75 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -10,6 +10,7 @@ use tycho_network::{try_handle_prefix, InboundRequestMeta, Response, Service, Se use tycho_storage::{ArchiveId, BlockConnection, KeyBlocksDirection, PersistentStateKind, Storage}; use tycho_util::futures::BoxFutureOrNoop; use tycho_util::metrics::HistogramGuard; +use tycho_util::tl::BytesLike; use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE}; use crate::proto::blockchain::*; @@ -404,7 +405,10 @@ impl Inner { } } - async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response { + async fn handle_get_block_full( + &self, + req: &rpc::GetBlockFull, + ) -> overlay::Response>>> { let label = [("method", "getBlockFull")]; let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); @@ -420,7 +424,7 @@ impl Inner { async fn handle_get_next_block_full( &self, req: &rpc::GetNextBlockFull, - ) -> overlay::Response { + ) -> overlay::Response>>> { let label = [("method", "getNextBlockFull")]; let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); @@ -447,13 +451,17 @@ impl Inner { } } - fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response { + fn handle_get_block_data_chunk( + &self, + req: &rpc::GetBlockDataChunk, + ) -> overlay::Response>>> { let label = [("method", "getBlockDataChunk")]; let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); let block_storage = self.storage.block_storage(); match block_storage.get_block_data_chunk(&req.block_id, req.offset) { - Ok(data) => overlay::Response::Ok(Data { data: data.into() }), + Ok(Some(data)) => overlay::Response::Ok(Data { data: data.into() }), + Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE), Err(e) => { tracing::warn!("get_block_data_chunk failed: {e:?}"); overlay::Response::Err(INTERNAL_ERROR_CODE) @@ -464,7 +472,7 @@ impl Inner { async fn handle_get_key_block_proof( &self, req: &rpc::GetKeyBlockProof, - ) -> overlay::Response { + ) -> overlay::Response>>> { let label = [("method", "getKeyBlockProof")]; let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); @@ -534,7 +542,7 @@ impl Inner { async fn handle_get_archive_chunk( &self, req: &rpc::GetArchiveChunk, - ) -> overlay::Response { + ) -> overlay::Response>>> { let label = [("method", "getArchiveChunk")]; let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label); @@ -599,41 +607,50 @@ impl Inner { } impl Inner { - async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result { + async fn get_block_full( + &self, + block_id: &BlockId, + ) -> anyhow::Result>>> { let block_handle_storage = self.storage().block_handle_storage(); let block_storage = self.storage().block_storage(); - Ok(match block_handle_storage.load_handle(block_id) { - Some(handle) if handle.has_all_block_parts() => { - let data_chunk_size = block_storage.block_data_chunk_size(); - let data = block_storage.get_block_data_chunk(block_id, 0)?; - let data_size = if data.len() < data_chunk_size.get() as usize { - // NOTE: Skip one RocksDB read for relatively small blocks - // Average block size is 4KB, while the chunk size is 1MB. - data.len() as u32 - } else { - block_storage.get_block_data_size(block_id)? - }; - - let block = BlockData { - data: data.into(), - size: NonZeroU32::new(data_size).expect("shouldn't happen"), - chunk_size: data_chunk_size, - }; + let handle = match block_handle_storage.load_handle(block_id) { + Some(handle) if handle.has_all_block_parts() => handle, + _ => return Ok(BlockFull::NotFound), + }; - let (proof, queue_diff) = tokio::join!( - block_storage.load_block_proof_raw(&handle), - block_storage.load_queue_diff_raw(&handle) - ); + let Some(data) = block_storage.get_block_data_chunk(block_id, 0)? else { + return Ok(BlockFull::NotFound); + }; - BlockFull::Found { - block_id: *block_id, - block, - proof: proof?.into(), - queue_diff: queue_diff?.into(), - } + let data_chunk_size = block_storage.block_data_chunk_size(); + let data_size = if data.len() < data_chunk_size.get() as usize { + // NOTE: Skip one RocksDB read for relatively small blocks + // Average block size is 4KB, while the chunk size is 1MB. + data.len() as u32 + } else { + match block_storage.get_block_data_size(block_id)? { + Some(size) => size, + None => return Ok(BlockFull::NotFound), } - _ => BlockFull::NotFound, + }; + + let block = BlockData { + data: data.into(), + size: NonZeroU32::new(data_size).expect("shouldn't happen"), + chunk_size: data_chunk_size, + }; + + let (proof, queue_diff) = tokio::join!( + block_storage.load_block_proof_raw(&handle), + block_storage.load_queue_diff_raw(&handle) + ); + + Ok(BlockFull::Found { + block_id: *block_id, + block, + proof: proof?.into(), + queue_diff: queue_diff?.into(), }) } diff --git a/core/src/proto/blockchain.rs b/core/src/proto/blockchain.rs index 528245d49..3602bfe07 100644 --- a/core/src/proto/blockchain.rs +++ b/core/src/proto/blockchain.rs @@ -14,8 +14,8 @@ pub struct OverlayIdData { #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.data", scheme = "proto.tl")] -pub struct Data { - pub data: Bytes, +pub struct Data { + pub data: T, } #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] @@ -28,14 +28,14 @@ pub struct KeyBlockIds { #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, scheme = "proto.tl")] -pub enum BlockFull { +pub enum BlockFull { #[tl(id = "blockchain.blockFull.found")] Found { #[tl(with = "tl_block_id")] block_id: everscale_types::models::BlockId, - block: BlockData, - proof: Bytes, - queue_diff: Bytes, + block: BlockData, + proof: T, + queue_diff: T, }, #[tl(id = "blockchain.blockFull.notFound")] NotFound, @@ -43,9 +43,9 @@ pub enum BlockFull { #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, scheme = "proto.tl")] -pub enum KeyBlockProof { +pub enum KeyBlockProof { #[tl(id = "blockchain.keyBlockProof.found")] - Found { proof: Bytes }, + Found { proof: T }, #[tl(id = "blockchain.keyBlockProof.notFound")] NotFound, } @@ -85,8 +85,8 @@ pub struct MessageBroadcastRef<'tl> { #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.blockData", scheme = "proto.tl")] -pub struct BlockData { - pub data: Bytes, +pub struct BlockData { + pub data: T, pub size: NonZeroU32, pub chunk_size: NonZeroU32, } diff --git a/core/tests/archives.rs b/core/tests/archives.rs index 68c71bf8a..750cb00b2 100644 --- a/core/tests/archives.rs +++ b/core/tests/archives.rs @@ -245,7 +245,7 @@ async fn archives() -> Result<()> { .block_storage() .get_archive_chunk(archive_id, offset as u64) .await?; - expected_archive_data.extend(chunk); + expected_archive_data.extend_from_slice(&chunk); } assert_eq!(archive_data, expected_archive_data); @@ -453,7 +453,7 @@ async fn check_archive( .block_storage() .get_archive_chunk(archive_id, offset as u64) .await?; - got_archive.extend(chunk); + got_archive.extend_from_slice(&chunk); } let original_decompressed = decompress(original_archive); diff --git a/storage/src/store/block/mod.rs b/storage/src/store/block/mod.rs index 18348531d..6fabe4d18 100644 --- a/storage/src/store/block/mod.rs +++ b/storage/src/store/block/mod.rs @@ -22,8 +22,8 @@ use tycho_util::compression::ZstdCompressStream; use tycho_util::metrics::HistogramGuard; use tycho_util::sync::rayon_run; use tycho_util::FastHashSet; -use weedb::rocksdb; use weedb::rocksdb::IteratorMode; +use weedb::{rocksdb, OwnedPinnableSlice}; pub use self::package_entry::{BlockDataEntryKey, PackageEntryKey, PartialBlockId}; use crate::db::*; @@ -324,7 +324,7 @@ impl BlockStorage { } } - pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result> { + pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result { if !handle.has_data() { return Err(BlockStorageError::BlockDataNotFound.into()); } @@ -453,7 +453,7 @@ impl BlockStorage { BlockProofStuff::deserialize(handle.id(), raw_proof.as_ref()) } - pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result> { + pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result { if !handle.has_proof() { return Err(BlockStorageError::BlockProofNotFound.into()); } @@ -520,7 +520,7 @@ impl BlockStorage { QueueDiffStuff::deserialize(handle.id(), raw_diff.as_ref()) } - pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result> { + pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result { if !handle.has_queue_diff() { return Err(BlockStorageError::QueueDiffNotFound.into()); } @@ -640,7 +640,7 @@ impl BlockStorage { } /// Loads an archive chunk. - pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result> { + pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result { let chunk_size = self.archive_chunk_size().get() as u64; if offset % chunk_size != 0 { return Err(BlockStorageError::InvalidOffset.into()); @@ -658,10 +658,11 @@ impl BlockStorage { .get(key.as_slice())? .ok_or(BlockStorageError::ArchiveNotFound)?; - Ok(chunk.to_vec()) + // SAFETY: A value was received from the same RocksDB instance. + Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), chunk) }) } - pub fn get_block_data_size(&self, block_id: &BlockId) -> Result { + pub fn get_block_data_size(&self, block_id: &BlockId) -> Result> { let key = BlockDataEntryKey { block_id: block_id.into(), chunk_index: BLOCK_DATA_SIZE_MAGIC, @@ -670,13 +671,16 @@ impl BlockStorage { .db .block_data_entries .get(key.to_vec())? - .map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap())) - .ok_or(BlockStorageError::BlockNotFound)?; + .map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap())); Ok(size) } - pub fn get_block_data_chunk(&self, block_id: &BlockId, offset: u32) -> Result> { + pub fn get_block_data_chunk( + &self, + block_id: &BlockId, + offset: u32, + ) -> Result> { let chunk_size = self.block_data_chunk_size().get(); if offset % chunk_size != 0 { return Err(BlockStorageError::InvalidOffset.into()); @@ -687,13 +691,10 @@ impl BlockStorage { chunk_index: offset / chunk_size, }; - let chunk = self - .db - .block_data_entries - .get(key.to_vec())? - .ok_or(BlockStorageError::BlockNotFound)?; - - Ok(chunk.to_vec()) + Ok(self.db.block_data_entries.get(key.to_vec())?.map(|value| { + // SAFETY: A value was received from the same RocksDB instance. + unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) } + })) } // === GC stuff === @@ -828,7 +829,11 @@ impl BlockStorage { Ok(()) } - async fn get_data(&self, handle: &BlockHandle, id: &PackageEntryKey) -> Result> { + async fn get_data( + &self, + handle: &BlockHandle, + id: &PackageEntryKey, + ) -> Result { let _lock = match id.ty { ArchiveEntryType::Block => handle.block_data_lock(), ArchiveEntryType::Proof => handle.proof_data_lock(), @@ -838,7 +843,8 @@ impl BlockStorage { .await; match self.db.package_entries.get(id.to_vec())? { - Some(a) => Ok(a.to_vec()), + // SAFETY: A value was received from the same RocksDB instance. + Some(value) => Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }), None => Err(BlockStorageError::PackageEntryNotFound.into()), } } @@ -1419,8 +1425,6 @@ struct PreparedArchiveId { enum BlockStorageError { #[error("Archive not found")] ArchiveNotFound, - #[error("Block not found")] - BlockNotFound, #[error("Block data not found")] BlockDataNotFound, #[error("Block proof not found")] diff --git a/util/src/tl.rs b/util/src/tl.rs index a0c342cef..42521a750 100644 --- a/util/src/tl.rs +++ b/util/src/tl.rs @@ -68,6 +68,102 @@ pub mod signature_arc { } } +/// A [`Bytes`]-like object wrapper. +#[repr(transparent)] +pub struct BytesLike(pub T); + +impl BytesLike { + pub fn into_inner(self) -> T { + self.0 + } +} + +impl Default for BytesLike { + #[inline] + fn default() -> Self { + Self(T::default()) + } +} + +impl std::ops::Deref for BytesLike { + type Target = T; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for BytesLike { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl> std::fmt::Debug for BytesLike { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(self.0.as_ref(), f) + } +} + +impl> Eq for BytesLike {} + +impl> PartialEq for BytesLike { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref().eq(other.0.as_ref()) + } +} + +impl Clone for BytesLike { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl> TlWrite for BytesLike { + type Repr = tl_proto::Boxed; + + fn max_size_hint(&self) -> usize { + tl_proto::bytes_max_size_hint(self.0.as_ref().len()) + } + + fn write_to(&self, packet: &mut P) { + self.0.as_ref().write_to(packet); + } +} + +impl<'a, T: TlRead<'a>> TlRead<'a> for BytesLike { + type Repr = tl_proto::Boxed; + + fn read_from(packet: &'a [u8], offset: &mut usize) -> tl_proto::TlResult { + T::read_from(packet, offset).map(Self) + } +} + +impl From for BytesLike { + #[inline] + fn from(value: T) -> Self { + Self(value) + } +} + +impl + 'static> From> for Bytes { + fn from(value: BytesLike) -> Self { + castaway::match_type!(value.0, { + Bytes as v => v, + Vec as v => Bytes::from(v), + Box<[u8]> as v => Bytes::from(v), + String as v => Bytes::from(v), + &'static [u8] as v => Bytes::from_static(v), + &'static str as v => Bytes::from_static(v.as_bytes()), + s => Bytes::copy_from_slice(s.as_ref()), + }) + } +} + pub struct VecWithMaxLen; impl VecWithMaxLen {