Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ tempfile = "3.20.0"
varint-rs = "2.2.0"
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }

[target.'cfg(target_os = "linux")'.dependencies]
rustix-uring = { version = "0.6.0" }
rustix = "1.1.2"

[dev-dependencies]
criterion = { version = "0.7.0", features = ["html_reports"] }
fs_extra = "1.3.0"
Expand Down
11 changes: 11 additions & 0 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,17 @@ pub trait AbstractTree {
/// Will return `Err` if an IO error occurs.
fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;

/// Todo
fn get_many_unsorted<'a>(
&'a self,
keys: impl IntoIterator<Item = &'a [u8]>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<UserValue>>> {
std::hint::black_box(keys);
std::hint::black_box(seqno);
todo!()
}

/// Returns `true` if the tree contains the specified key.
///
/// # Examples
Expand Down
8 changes: 8 additions & 0 deletions src/table/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,15 @@ impl Block {
compression: CompressionType,
) -> crate::Result<Self> {
let buf = crate::file::read_exact(file, *handle.offset(), handle.size() as usize)?;
Self::from_slice(buf, handle, compression)
}

/// Reads a block from a slice.
pub fn from_slice(
buf: Slice,
handle: BlockHandle,
compression: CompressionType,
) -> crate::Result<Self> {
let header = Header::decode_from(&mut &buf[..])?;

#[expect(clippy::indexing_slicing)]
Expand Down
101 changes: 99 additions & 2 deletions src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{
path::PathBuf,
sync::Arc,
};
use util::load_block;
use util::{load_block, load_block_pure, BlockOutput};

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
Expand Down Expand Up @@ -157,7 +157,7 @@ impl Table {

/// Gets the global table ID.
#[must_use]
fn global_id(&self) -> GlobalTableId {
pub(crate) fn global_id(&self) -> GlobalTableId {
(self.tree_id, self.id()).into()
}

Expand Down Expand Up @@ -213,6 +213,18 @@ impl Table {
)
}

fn load_block_pure(&self, handle: &BlockHandle, block_type: BlockType) -> BlockOutput {
load_block_pure(
self.global_id(),
&self.descriptor_table,
&self.cache,
handle,
block_type,
#[cfg(feature = "metrics")]
&self.metrics,
)
}

fn load_data_block(&self, handle: &BlockHandle) -> crate::Result<DataBlock> {
self.load_block(
handle,
Expand Down Expand Up @@ -625,3 +637,88 @@ impl Table {
// self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
}
}

pub use pure::*;

pub mod pure {
use super::*;
use crate::table::Io::{FilterBlockFd, FilterBlockRead};

#[derive(Debug, Clone)]
pub enum Io {
FilterBlockFd {
block_handle: BlockHandle,
},
FilterBlockRead {
block_handle: BlockHandle,
file: Arc<File>,
},
PointRead,
}

pub enum Output {
Pure(Option<InternalValue>),
Io(Io),
}

impl Table {
pub fn pure_get(&self, key: &[u8], seqno: SeqNo, key_hash: u64) -> crate::Result<Output> {
#[cfg(feature = "metrics")]
use std::sync::atomic::Ordering::Relaxed;
if (self.metadata.seqnos.0 + self.global_seqno()) >= seqno {
return Ok(Output::Pure(None));
}

let handle_loadable_filter = |handle: BlockHandle| -> crate::Result<_> {
match self.load_block_pure(&handle, BlockType::Filter) {
BlockOutput::Block(block) => {
let block = FilterBlock::new(block);
if !block.maybe_contains_hash(key_hash)? {
#[cfg(feature = "metrics")]
self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed);

Ok(Output::Pure(None))
} else {
Ok(Output::Io(Io::PointRead))
}
}
BlockOutput::OpenFd => Ok(Output::Io(FilterBlockFd {
block_handle: handle,
})),
BlockOutput::ReadFile(file) => Ok(Output::Io(FilterBlockRead {
block_handle: handle,
file,
})),
}
};

if let Some(filter_block) = &self.pinned_filter_block {
#[cfg(feature = "metrics")]
self.metrics.filter_queries.fetch_add(1, Relaxed);

if !filter_block.maybe_contains_hash(key_hash)? {
#[cfg(feature = "metrics")]
self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed);

return Ok(Output::Pure(None));
}
} else if let Some(filter_idx) = &self.pinned_filter_index {
let mut iter = filter_idx.iter();
iter.seek(key, seqno);

if let Some(filter_block_handle) = iter.next() {
let filter_block_handle =
filter_block_handle.materialize(filter_idx.as_slice());
let handle = filter_block_handle.into_inner();
return handle_loadable_filter(handle);
}
} else if let Some(_filter_tli_handle) = &self.regions.filter_tli {
unimplemented!("unpinned filter TLI not supported");
} else if let Some(filter_block_handle) = &self.regions.filter {
return handle_loadable_filter(*filter_block_handle);
}

Ok(Output::Io(Io::PointRead))
}
}
}
62 changes: 62 additions & 0 deletions src/table/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,68 @@ pub fn compare_prefixed_slice(prefix: &[u8], suffix: &[u8], needle: &[u8]) -> st
suffix.cmp(needle)
}

mod pure {
use crate::table::block::BlockType;
use crate::table::{Block, BlockHandle};
use crate::{Cache, DescriptorTable, GlobalTableId};
use std::sync::Arc;

pub enum Output {
Block(Block),
OpenFd,
ReadFile(Arc<std::fs::File>),
}

pub fn load_block_pure(
table_id: GlobalTableId,
// path: &Path,
descriptor_table: &DescriptorTable,
cache: &Cache,
handle: &BlockHandle,
block_type: BlockType,
// compression: CompressionType,
#[cfg(feature = "metrics")] metrics: &crate::metrics::Metrics,
) -> Output {
#[cfg(feature = "metrics")]
use std::sync::atomic::Ordering::Relaxed;

log::trace!("load {block_type:?} block {handle:?}");

if let Some(block) = cache.get_block(table_id, handle.offset()) {
#[cfg(feature = "metrics")]
match block_type {
BlockType::Filter => {
metrics.filter_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::Index => {
metrics.index_block_load_cached.fetch_add(1, Relaxed);
}
BlockType::Data | BlockType::Meta => {
metrics.data_block_load_cached.fetch_add(1, Relaxed);
}
_ => {}
}

return Output::Block(block);
}

let cached_fd = descriptor_table.access_for_table(&table_id);
if let Some(fd) = cached_fd {
#[cfg(feature = "metrics")]
metrics.table_file_opened_cached.fetch_add(1, Relaxed);

Output::ReadFile(fd)
} else {
#[cfg(feature = "metrics")]
metrics.table_file_opened_uncached.fetch_add(1, Relaxed);

Output::OpenFd
}
}
}

pub use pure::{load_block_pure, Output as BlockOutput};

#[cfg(test)]
mod tests {
use super::*;
Expand Down
83 changes: 80 additions & 3 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ pub mod ingest;
pub mod inner;
pub mod sealed;

#[cfg(target_os = "linux")]
mod multi_get_linux;

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
use crate::{
compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
config::Config,
Expand All @@ -23,15 +28,13 @@ use crate::{
ValueType,
};
use inner::{TreeId, TreeInner};
use rustc_hash::FxHashMap;
use std::{
ops::{Bound, RangeBounds},
path::Path,
sync::{Arc, Mutex, RwLock},
};

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;

/// Iterator value guard
pub struct Guard(crate::Result<(UserKey, UserValue)>);

Expand Down Expand Up @@ -598,6 +601,25 @@ impl AbstractTree for Tree {
.map(|x| x.value))
}

fn get_many_unsorted<'a>(
&'a self,
keys: impl IntoIterator<Item = &'a [u8]>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<UserValue>>> {
let mut keys = keys.into_iter().collect::<Vec<_>>();
if keys.is_empty() {
return Ok(Vec::new());
}
keys.sort_unstable();
let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);

Self::get_internal_entries_from_version(&super_version, &keys, seqno, |x| x.value)
}

fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
Expand Down Expand Up @@ -669,6 +691,36 @@ impl Tree {
Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
}

pub(crate) fn get_internal_entries_from_version<V: Clone>(
super_version: &SuperVersion,
keys: &[&[u8]], // keys must be sorted
seqno: SeqNo,
mapper: impl FnMut(InternalValue) -> V + Copy,
) -> crate::Result<Vec<Option<V>>> {
let mut result = vec![None; keys.len()];
let mut needs_resolution = Vec::with_capacity(keys.len());
for ((idx, &key), res) in keys.iter().enumerate().zip(result.iter_mut()) {
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
*res = ignore_tombstone_value(entry).map(mapper);
}
// Now look in sealed memtables
if let Some(entry) =
Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
{
*res = ignore_tombstone_value(entry).map(mapper)
}
needs_resolution.push((idx, key))
}
// Now look in tables... this may involve disk I/O
Self::get_internal_entries_from_tables(
&super_version.version,
&needs_resolution,
seqno,
|value, idx| result[idx] = ignore_tombstone_value(value).map(mapper),
)?;
Ok(result)
}

fn get_internal_entry_from_tables(
version: &Version,
key: &[u8],
Expand All @@ -691,6 +743,31 @@ impl Tree {
Ok(None)
}

fn get_internal_entries_from_tables(
version: &Version,
keys_and_indices: &[(usize, &[u8])],
seqno: SeqNo,
mut resolve: impl FnMut(InternalValue, usize),
) -> crate::Result<()> {
#[cfg(target_os = "linux")]
{
multi_get_linux::multi_get(version, keys_and_indices, seqno, &mut resolve)
}
// todo actually windows also supports IoRing https://learn.microsoft.com/en-us/windows/win32/api/ioringapi/
#[cfg(not(target_os = "linux"))]
{
keys_and_indices
.into_iter()
.try_for_each(|(idx, key)| -> crate::Result<()> {
let value = Self::get_internal_entry_from_tables(version, key, seqno)?;
if let Some(value) = value {
resolve(value, *idx)
}
Ok(())
})
}
}

fn get_internal_entry_from_sealed_memtables(
super_version: &SuperVersion,
key: &[u8],
Expand Down
Loading