Skip to content

Commit 0708f12

Browse files
committed
Add Linux-specific multi_get implementation for efficient key retrieval
1 parent 268edc7 commit 0708f12

File tree

6 files changed

+699
-12
lines changed

6 files changed

+699
-12
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ tempfile = "3.20.0"
3939
varint-rs = "2.2.0"
4040
xxhash-rust = { version = "0.8.15", features = ["xxh3"] }
4141

42+
[target.'cfg(target_os = "linux")'.dependencies]
43+
rustix-uring = { version = "0.6.0" }
44+
rustix = "1.1.2"
45+
4246
[dev-dependencies]
4347
criterion = { version = "0.7.0", features = ["html_reports"] }
4448
fs_extra = "1.3.0"

src/table/block/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,15 @@ impl Block {
134134
compression: CompressionType,
135135
) -> crate::Result<Self> {
136136
let buf = crate::file::read_exact(file, *handle.offset(), handle.size() as usize)?;
137+
Self::from_slice(buf, handle, compression)
138+
}
137139

140+
/// Reads a block from a slice.
141+
pub fn from_slice(
142+
buf: Slice,
143+
handle: BlockHandle,
144+
compression: CompressionType,
145+
) -> crate::Result<Self> {
138146
let header = Header::decode_from(&mut &buf[..])?;
139147

140148
#[expect(clippy::indexing_slicing)]

src/table/mod.rs

Lines changed: 99 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use std::{
4949
path::PathBuf,
5050
sync::Arc,
5151
};
52-
use util::load_block;
52+
use util::{load_block, load_block_pure, BlockOutput};
5353

5454
#[cfg(feature = "metrics")]
5555
use crate::metrics::Metrics;
@@ -157,7 +157,7 @@ impl Table {
157157

158158
/// Gets the global table ID.
159159
#[must_use]
160-
fn global_id(&self) -> GlobalTableId {
160+
pub(crate) fn global_id(&self) -> GlobalTableId {
161161
(self.tree_id, self.id()).into()
162162
}
163163

@@ -213,6 +213,18 @@ impl Table {
213213
)
214214
}
215215

216+
fn load_block_pure(&self, handle: &BlockHandle, block_type: BlockType) -> BlockOutput {
217+
load_block_pure(
218+
self.global_id(),
219+
&self.descriptor_table,
220+
&self.cache,
221+
handle,
222+
block_type,
223+
#[cfg(feature = "metrics")]
224+
&self.metrics,
225+
)
226+
}
227+
216228
fn load_data_block(&self, handle: &BlockHandle) -> crate::Result<DataBlock> {
217229
self.load_block(
218230
handle,
@@ -625,3 +637,88 @@ impl Table {
625637
// self.metadata.tombstone_count as f32 / self.metadata.key_count as f32
626638
}
627639
}
640+
641+
pub use pure::*;
642+
643+
pub mod pure {
644+
use super::*;
645+
use crate::table::Io::{FilterBlockFd, FilterBlockRead};
646+
647+
#[derive(Debug, Clone)]
648+
pub enum Io {
649+
FilterBlockFd {
650+
block_handle: BlockHandle,
651+
},
652+
FilterBlockRead {
653+
block_handle: BlockHandle,
654+
file: Arc<File>,
655+
},
656+
PointRead,
657+
}
658+
659+
pub enum Output {
660+
Pure(Option<InternalValue>),
661+
Io(Io),
662+
}
663+
664+
impl Table {
665+
pub fn pure_get(&self, key: &[u8], seqno: SeqNo, key_hash: u64) -> crate::Result<Output> {
666+
#[cfg(feature = "metrics")]
667+
use std::sync::atomic::Ordering::Relaxed;
668+
if (self.metadata.seqnos.0 + self.global_seqno()) >= seqno {
669+
return Ok(Output::Pure(None));
670+
}
671+
672+
let handle_loadable_filter = |handle: BlockHandle| -> crate::Result<_> {
673+
match self.load_block_pure(&handle, BlockType::Filter) {
674+
BlockOutput::Block(block) => {
675+
let block = FilterBlock::new(block);
676+
if !block.maybe_contains_hash(key_hash)? {
677+
#[cfg(feature = "metrics")]
678+
self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed);
679+
680+
Ok(Output::Pure(None))
681+
} else {
682+
Ok(Output::Io(Io::PointRead))
683+
}
684+
}
685+
BlockOutput::OpenFd => Ok(Output::Io(FilterBlockFd {
686+
block_handle: handle,
687+
})),
688+
BlockOutput::ReadFile(file) => Ok(Output::Io(FilterBlockRead {
689+
block_handle: handle,
690+
file,
691+
})),
692+
}
693+
};
694+
695+
if let Some(filter_block) = &self.pinned_filter_block {
696+
#[cfg(feature = "metrics")]
697+
self.metrics.filter_queries.fetch_add(1, Relaxed);
698+
699+
if !filter_block.maybe_contains_hash(key_hash)? {
700+
#[cfg(feature = "metrics")]
701+
self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed);
702+
703+
return Ok(Output::Pure(None));
704+
}
705+
} else if let Some(filter_idx) = &self.pinned_filter_index {
706+
let mut iter = filter_idx.iter();
707+
iter.seek(key, seqno);
708+
709+
if let Some(filter_block_handle) = iter.next() {
710+
let filter_block_handle =
711+
filter_block_handle.materialize(filter_idx.as_slice());
712+
let handle = filter_block_handle.into_inner();
713+
return handle_loadable_filter(handle);
714+
}
715+
} else if let Some(_filter_tli_handle) = &self.regions.filter_tli {
716+
unimplemented!("unpinned filter TLI not supported");
717+
} else if let Some(filter_block_handle) = &self.regions.filter {
718+
return handle_loadable_filter(*filter_block_handle);
719+
}
720+
721+
Ok(Output::Io(Io::PointRead))
722+
}
723+
}
724+
}

src/table/util.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,68 @@ pub fn compare_prefixed_slice(prefix: &[u8], suffix: &[u8], needle: &[u8]) -> st
170170
suffix.cmp(needle)
171171
}
172172

173+
mod pure {
174+
use crate::table::block::BlockType;
175+
use crate::table::{Block, BlockHandle};
176+
use crate::{Cache, DescriptorTable, GlobalTableId};
177+
use std::sync::Arc;
178+
179+
pub enum Output {
180+
Block(Block),
181+
OpenFd,
182+
ReadFile(Arc<std::fs::File>),
183+
}
184+
185+
pub fn load_block_pure(
186+
table_id: GlobalTableId,
187+
// path: &Path,
188+
descriptor_table: &DescriptorTable,
189+
cache: &Cache,
190+
handle: &BlockHandle,
191+
block_type: BlockType,
192+
// compression: CompressionType,
193+
#[cfg(feature = "metrics")] metrics: &crate::metrics::Metrics,
194+
) -> Output {
195+
#[cfg(feature = "metrics")]
196+
use std::sync::atomic::Ordering::Relaxed;
197+
198+
log::trace!("load {block_type:?} block {handle:?}");
199+
200+
if let Some(block) = cache.get_block(table_id, handle.offset()) {
201+
#[cfg(feature = "metrics")]
202+
match block_type {
203+
BlockType::Filter => {
204+
metrics.filter_block_load_cached.fetch_add(1, Relaxed);
205+
}
206+
BlockType::Index => {
207+
metrics.index_block_load_cached.fetch_add(1, Relaxed);
208+
}
209+
BlockType::Data | BlockType::Meta => {
210+
metrics.data_block_load_cached.fetch_add(1, Relaxed);
211+
}
212+
_ => {}
213+
}
214+
215+
return Output::Block(block);
216+
}
217+
218+
let cached_fd = descriptor_table.access_for_table(&table_id);
219+
if let Some(fd) = cached_fd {
220+
#[cfg(feature = "metrics")]
221+
metrics.table_file_opened_cached.fetch_add(1, Relaxed);
222+
223+
Output::ReadFile(fd)
224+
} else {
225+
#[cfg(feature = "metrics")]
226+
metrics.table_file_opened_uncached.fetch_add(1, Relaxed);
227+
228+
Output::OpenFd
229+
}
230+
}
231+
}
232+
233+
pub use pure::{load_block_pure, Output as BlockOutput};
234+
173235
#[cfg(test)]
174236
mod tests {
175237
use super::*;

src/tree/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ pub mod ingest;
66
pub mod inner;
77
pub mod sealed;
88

9+
#[cfg(target_os = "linux")]
10+
mod multi_get_linux;
11+
12+
#[cfg(feature = "metrics")]
13+
use crate::metrics::Metrics;
914
use crate::{
1015
compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
1116
config::Config,
@@ -23,15 +28,13 @@ use crate::{
2328
ValueType,
2429
};
2530
use inner::{TreeId, TreeInner};
31+
use rustc_hash::FxHashMap;
2632
use std::{
2733
ops::{Bound, RangeBounds},
2834
path::Path,
2935
sync::{Arc, Mutex, RwLock},
3036
};
3137

32-
#[cfg(feature = "metrics")]
33-
use crate::metrics::Metrics;
34-
3538
/// Iterator value guard
3639
pub struct Guard(crate::Result<(UserKey, UserValue)>);
3740

@@ -713,7 +716,7 @@ impl Tree {
713716
&super_version.version,
714717
&needs_resolution,
715718
seqno,
716-
|value, idx| result[idx] = Some(mapper(value)),
719+
|value, idx| result[idx] = ignore_tombstone_value(value).map(mapper),
717720
)?;
718721
Ok(result)
719722
}
@@ -746,10 +749,10 @@ impl Tree {
746749
seqno: SeqNo,
747750
mut resolve: impl FnMut(InternalValue, usize),
748751
) -> crate::Result<()> {
749-
// #[cfg(target_os = "linux")]
750-
// {
751-
//
752-
// }
752+
#[cfg(target_os = "linux")]
753+
{
754+
multi_get_linux::multi_get(version, keys_and_indices, seqno, &mut resolve)
755+
}
753756
// todo actually windows also supports IoRing https://learn.microsoft.com/en-us/windows/win32/api/ioringapi/
754757
#[cfg(not(target_os = "linux"))]
755758
{
@@ -761,9 +764,8 @@ impl Tree {
761764
resolve(value, *idx)
762765
}
763766
Ok(())
764-
})?;
767+
})
765768
}
766-
Ok(())
767769
}
768770

769771
fn get_internal_entry_from_sealed_memtables(

0 commit comments

Comments
 (0)