Skip to content

Commit

Permalink
fix(core): fix get_block_full rpc response if block not found (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov authored Oct 22, 2024
1 parent 087de8e commit b7b2b34
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 75 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions control/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl proto::ControlServer for ControlServer {
return Ok(proto::BlockResponse::NotFound);
};

let data = blocks.load_block_data_raw(&handle).await?;
let data = blocks.load_block_data_raw(&handle).await?.to_vec();
Ok(proto::BlockResponse::Found { data })
}

Expand All @@ -216,7 +216,7 @@ impl proto::ControlServer for ControlServer {
return Ok(proto::BlockResponse::NotFound);
};

let data = blocks.load_block_proof_raw(&handle).await?;
let data = blocks.load_block_proof_raw(&handle).await?.to_vec();
Ok(proto::BlockResponse::Found { data })
}

Expand All @@ -232,7 +232,7 @@ impl proto::ControlServer for ControlServer {
return Ok(proto::BlockResponse::NotFound);
};

let data = blocks.load_queue_diff_raw(&handle).await?;
let data = blocks.load_queue_diff_raw(&handle).await?.to_vec();
Ok(proto::BlockResponse::Found { data })
}

Expand Down Expand Up @@ -267,8 +267,10 @@ impl proto::ControlServer for ControlServer {
) -> ServerResult<proto::ArchiveSliceResponse> {
let blocks = self.inner.storage.block_storage();

let data = blocks.get_archive_chunk(req.archive_id, req.offset).await?;

let data = blocks
.get_archive_chunk(req.archive_id, req.offset)
.await?
.to_vec();
Ok(proto::ArchiveSliceResponse { data })
}

Expand Down
87 changes: 52 additions & 35 deletions core/src/blockchain_rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tycho_network::{try_handle_prefix, InboundRequestMeta, Response, Service, Se
use tycho_storage::{ArchiveId, BlockConnection, KeyBlocksDirection, PersistentStateKind, Storage};
use tycho_util::futures::BoxFutureOrNoop;
use tycho_util::metrics::HistogramGuard;
use tycho_util::tl::BytesLike;

use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE};
use crate::proto::blockchain::*;
Expand Down Expand Up @@ -404,7 +405,10 @@ impl<B> Inner<B> {
}
}

async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response<BlockFull> {
async fn handle_get_block_full(
&self,
req: &rpc::GetBlockFull,
) -> overlay::Response<BlockFull<BytesLike<impl AsRef<[u8]>>>> {
let label = [("method", "getBlockFull")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

Expand All @@ -420,7 +424,7 @@ impl<B> Inner<B> {
async fn handle_get_next_block_full(
&self,
req: &rpc::GetNextBlockFull,
) -> overlay::Response<BlockFull> {
) -> overlay::Response<BlockFull<BytesLike<impl AsRef<[u8]>>>> {
let label = [("method", "getNextBlockFull")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

Expand All @@ -447,13 +451,17 @@ impl<B> Inner<B> {
}
}

fn handle_get_block_data_chunk(&self, req: &rpc::GetBlockDataChunk) -> overlay::Response<Data> {
fn handle_get_block_data_chunk(
&self,
req: &rpc::GetBlockDataChunk,
) -> overlay::Response<Data<BytesLike<impl AsRef<[u8]>>>> {
let label = [("method", "getBlockDataChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

let block_storage = self.storage.block_storage();
match block_storage.get_block_data_chunk(&req.block_id, req.offset) {
Ok(data) => overlay::Response::Ok(Data { data: data.into() }),
Ok(Some(data)) => overlay::Response::Ok(Data { data: data.into() }),
Ok(None) => overlay::Response::Err(NOT_FOUND_ERROR_CODE),
Err(e) => {
tracing::warn!("get_block_data_chunk failed: {e:?}");
overlay::Response::Err(INTERNAL_ERROR_CODE)
Expand All @@ -464,7 +472,7 @@ impl<B> Inner<B> {
async fn handle_get_key_block_proof(
&self,
req: &rpc::GetKeyBlockProof,
) -> overlay::Response<KeyBlockProof> {
) -> overlay::Response<KeyBlockProof<BytesLike<impl AsRef<[u8]>>>> {
let label = [("method", "getKeyBlockProof")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

Expand Down Expand Up @@ -534,7 +542,7 @@ impl<B> Inner<B> {
async fn handle_get_archive_chunk(
&self,
req: &rpc::GetArchiveChunk,
) -> overlay::Response<Data> {
) -> overlay::Response<Data<BytesLike<impl AsRef<[u8]>>>> {
let label = [("method", "getArchiveChunk")];
let _hist = HistogramGuard::begin_with_labels(RPC_METHOD_TIMINGS_METRIC, &label);

Expand Down Expand Up @@ -599,41 +607,50 @@ impl<B> Inner<B> {
}

impl<B> Inner<B> {
async fn get_block_full(&self, block_id: &BlockId) -> anyhow::Result<BlockFull> {
async fn get_block_full(
&self,
block_id: &BlockId,
) -> anyhow::Result<BlockFull<BytesLike<impl AsRef<[u8]>>>> {
let block_handle_storage = self.storage().block_handle_storage();
let block_storage = self.storage().block_storage();

Ok(match block_handle_storage.load_handle(block_id) {
Some(handle) if handle.has_all_block_parts() => {
let data_chunk_size = block_storage.block_data_chunk_size();
let data = block_storage.get_block_data_chunk(block_id, 0)?;
let data_size = if data.len() < data_chunk_size.get() as usize {
// NOTE: Skip one RocksDB read for relatively small blocks
// Average block size is 4KB, while the chunk size is 1MB.
data.len() as u32
} else {
block_storage.get_block_data_size(block_id)?
};

let block = BlockData {
data: data.into(),
size: NonZeroU32::new(data_size).expect("shouldn't happen"),
chunk_size: data_chunk_size,
};
let handle = match block_handle_storage.load_handle(block_id) {
Some(handle) if handle.has_all_block_parts() => handle,
_ => return Ok(BlockFull::NotFound),
};

let (proof, queue_diff) = tokio::join!(
block_storage.load_block_proof_raw(&handle),
block_storage.load_queue_diff_raw(&handle)
);
let Some(data) = block_storage.get_block_data_chunk(block_id, 0)? else {
return Ok(BlockFull::NotFound);
};

BlockFull::Found {
block_id: *block_id,
block,
proof: proof?.into(),
queue_diff: queue_diff?.into(),
}
let data_chunk_size = block_storage.block_data_chunk_size();
let data_size = if data.len() < data_chunk_size.get() as usize {
// NOTE: Skip one RocksDB read for relatively small blocks
// Average block size is 4KB, while the chunk size is 1MB.
data.len() as u32
} else {
match block_storage.get_block_data_size(block_id)? {
Some(size) => size,
None => return Ok(BlockFull::NotFound),
}
_ => BlockFull::NotFound,
};

let block = BlockData {
data: data.into(),
size: NonZeroU32::new(data_size).expect("shouldn't happen"),
chunk_size: data_chunk_size,
};

let (proof, queue_diff) = tokio::join!(
block_storage.load_block_proof_raw(&handle),
block_storage.load_queue_diff_raw(&handle)
);

Ok(BlockFull::Found {
block_id: *block_id,
block,
proof: proof?.into(),
queue_diff: queue_diff?.into(),
})
}

Expand Down
20 changes: 10 additions & 10 deletions core/src/proto/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pub struct OverlayIdData {

#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)]
#[tl(boxed, id = "blockchain.data", scheme = "proto.tl")]
pub struct Data {
pub data: Bytes,
pub struct Data<T = Bytes> {
pub data: T,
}

#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)]
Expand All @@ -28,24 +28,24 @@ pub struct KeyBlockIds {

#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)]
#[tl(boxed, scheme = "proto.tl")]
pub enum BlockFull {
pub enum BlockFull<T = Bytes> {
#[tl(id = "blockchain.blockFull.found")]
Found {
#[tl(with = "tl_block_id")]
block_id: everscale_types::models::BlockId,
block: BlockData,
proof: Bytes,
queue_diff: Bytes,
block: BlockData<T>,
proof: T,
queue_diff: T,
},
#[tl(id = "blockchain.blockFull.notFound")]
NotFound,
}

#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)]
#[tl(boxed, scheme = "proto.tl")]
pub enum KeyBlockProof {
pub enum KeyBlockProof<T = Bytes> {
#[tl(id = "blockchain.keyBlockProof.found")]
Found { proof: Bytes },
Found { proof: T },
#[tl(id = "blockchain.keyBlockProof.notFound")]
NotFound,
}
Expand Down Expand Up @@ -85,8 +85,8 @@ pub struct MessageBroadcastRef<'tl> {

#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)]
#[tl(boxed, id = "blockchain.blockData", scheme = "proto.tl")]
pub struct BlockData {
pub data: Bytes,
pub struct BlockData<T = Bytes> {
pub data: T,
pub size: NonZeroU32,
pub chunk_size: NonZeroU32,
}
Expand Down
4 changes: 2 additions & 2 deletions core/tests/archives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn archives() -> Result<()> {
.block_storage()
.get_archive_chunk(archive_id, offset as u64)
.await?;
expected_archive_data.extend(chunk);
expected_archive_data.extend_from_slice(&chunk);
}
assert_eq!(archive_data, expected_archive_data);

Expand Down Expand Up @@ -453,7 +453,7 @@ async fn check_archive(
.block_storage()
.get_archive_chunk(archive_id, offset as u64)
.await?;
got_archive.extend(chunk);
got_archive.extend_from_slice(&chunk);
}

let original_decompressed = decompress(original_archive);
Expand Down
46 changes: 25 additions & 21 deletions storage/src/store/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use tycho_util::compression::ZstdCompressStream;
use tycho_util::metrics::HistogramGuard;
use tycho_util::sync::rayon_run;
use tycho_util::FastHashSet;
use weedb::rocksdb;
use weedb::rocksdb::IteratorMode;
use weedb::{rocksdb, OwnedPinnableSlice};

pub use self::package_entry::{BlockDataEntryKey, PackageEntryKey, PartialBlockId};
use crate::db::*;
Expand Down Expand Up @@ -324,7 +324,7 @@ impl BlockStorage {
}
}

pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result<Vec<u8>> {
pub async fn load_block_data_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
if !handle.has_data() {
return Err(BlockStorageError::BlockDataNotFound.into());
}
Expand Down Expand Up @@ -453,7 +453,7 @@ impl BlockStorage {
BlockProofStuff::deserialize(handle.id(), raw_proof.as_ref())
}

pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<Vec<u8>> {
pub async fn load_block_proof_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
if !handle.has_proof() {
return Err(BlockStorageError::BlockProofNotFound.into());
}
Expand Down Expand Up @@ -520,7 +520,7 @@ impl BlockStorage {
QueueDiffStuff::deserialize(handle.id(), raw_diff.as_ref())
}

pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<Vec<u8>> {
pub async fn load_queue_diff_raw(&self, handle: &BlockHandle) -> Result<OwnedPinnableSlice> {
if !handle.has_queue_diff() {
return Err(BlockStorageError::QueueDiffNotFound.into());
}
Expand Down Expand Up @@ -640,7 +640,7 @@ impl BlockStorage {
}

/// Loads an archive chunk.
pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<Vec<u8>> {
pub async fn get_archive_chunk(&self, id: u32, offset: u64) -> Result<OwnedPinnableSlice> {
let chunk_size = self.archive_chunk_size().get() as u64;
if offset % chunk_size != 0 {
return Err(BlockStorageError::InvalidOffset.into());
Expand All @@ -658,10 +658,11 @@ impl BlockStorage {
.get(key.as_slice())?
.ok_or(BlockStorageError::ArchiveNotFound)?;

Ok(chunk.to_vec())
// SAFETY: A value was received from the same RocksDB instance.
Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), chunk) })
}

pub fn get_block_data_size(&self, block_id: &BlockId) -> Result<u32> {
pub fn get_block_data_size(&self, block_id: &BlockId) -> Result<Option<u32>> {
let key = BlockDataEntryKey {
block_id: block_id.into(),
chunk_index: BLOCK_DATA_SIZE_MAGIC,
Expand All @@ -670,13 +671,16 @@ impl BlockStorage {
.db
.block_data_entries
.get(key.to_vec())?
.map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap()))
.ok_or(BlockStorageError::BlockNotFound)?;
.map(|slice| u32::from_le_bytes(slice.as_ref().try_into().unwrap()));

Ok(size)
}

pub fn get_block_data_chunk(&self, block_id: &BlockId, offset: u32) -> Result<Vec<u8>> {
pub fn get_block_data_chunk(
&self,
block_id: &BlockId,
offset: u32,
) -> Result<Option<OwnedPinnableSlice>> {
let chunk_size = self.block_data_chunk_size().get();
if offset % chunk_size != 0 {
return Err(BlockStorageError::InvalidOffset.into());
Expand All @@ -687,13 +691,10 @@ impl BlockStorage {
chunk_index: offset / chunk_size,
};

let chunk = self
.db
.block_data_entries
.get(key.to_vec())?
.ok_or(BlockStorageError::BlockNotFound)?;

Ok(chunk.to_vec())
Ok(self.db.block_data_entries.get(key.to_vec())?.map(|value| {
// SAFETY: A value was received from the same RocksDB instance.
unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }
}))
}

// === GC stuff ===
Expand Down Expand Up @@ -828,7 +829,11 @@ impl BlockStorage {
Ok(())
}

async fn get_data(&self, handle: &BlockHandle, id: &PackageEntryKey) -> Result<Vec<u8>> {
async fn get_data(
&self,
handle: &BlockHandle,
id: &PackageEntryKey,
) -> Result<OwnedPinnableSlice> {
let _lock = match id.ty {
ArchiveEntryType::Block => handle.block_data_lock(),
ArchiveEntryType::Proof => handle.proof_data_lock(),
Expand All @@ -838,7 +843,8 @@ impl BlockStorage {
.await;

match self.db.package_entries.get(id.to_vec())? {
Some(a) => Ok(a.to_vec()),
// SAFETY: A value was received from the same RocksDB instance.
Some(value) => Ok(unsafe { OwnedPinnableSlice::new(self.db.rocksdb().clone(), value) }),
None => Err(BlockStorageError::PackageEntryNotFound.into()),
}
}
Expand Down Expand Up @@ -1419,8 +1425,6 @@ struct PreparedArchiveId {
enum BlockStorageError {
#[error("Archive not found")]
ArchiveNotFound,
#[error("Block not found")]
BlockNotFound,
#[error("Block data not found")]
BlockDataNotFound,
#[error("Block proof not found")]
Expand Down
Loading

0 comments on commit b7b2b34

Please sign in to comment.