Skip to content

Commit

Permalink
fix:
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <yuchen@neon.tech>
  • Loading branch information
yliang412 committed Aug 14, 2024
1 parent f6b6925 commit 53b3f9f
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 56 deletions.
36 changes: 33 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ default = []
testing = ["fail/failpoints"]

[dependencies]
aligned-vec = "0.6.1"
anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
Expand Down
115 changes: 62 additions & 53 deletions pageserver/src/virtual_file/dio.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
pub(crate) mod buffer;
use std::alloc::{Layout, LayoutError};
use std::mem::MaybeUninit;
use std::os::unix::ffi::OsStrExt;

// use anyhow::bail;
use bytes::Bytes;
// use nix::libc;
// use nix::libc::statx as Statx;
use bytes::{Bytes, BytesMut};

/// Allocates an aligned buffer.
pub fn alloc_aligned(len: usize, align: usize) -> Result<Box<[u8]>, LayoutError> {
Expand Down Expand Up @@ -35,58 +34,68 @@ impl TryAllocAligned for bytes::BytesMut {
}
}

// /// Direct IO alignment info.
// pub(crate) struct StatxDioAlignInfo {
// stx_dio_mem_align: usize,
// stx_dio_offset_align: usize,
// }
#[cfg(target_os = "linux")]
mod dio_mem_alignment {
use std::mem::MaybeUninit;
use std::os::unix::ffi::OsStrExt;

// /// Gets Direct IO alignment info through `statx(2)` system call.
// /// TODO: Does our machine support this?
// ///
// /// Adapted from https://gist.github.com/problame/1c35cac41b7cd617779f8aae50f97155/revisions.
// fn statx_align_info<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<StatxDioAlignInfo> {
// const REQUESTS: [(&str, u32); 2] = [
// ("STATX_BASIC_STATS", libc::STATX_BASIC_STATS),
// ("STATX_DIOALIGN", libc::STATX_DIOALIGN),
// ];
use anyhow::bail;
use nix::libc;
use nix::libc::statx as Statx;

// let mask = REQUESTS.iter().map(|(_, v)| v).fold(0, |l, r| l | r);
// let mut statx_buf = unsafe { MaybeUninit::<Statx>::zeroed().assume_init() };
// let status = unsafe {
// let c_path = path.as_ref().as_os_str().as_bytes();
// let c_path = c_path as *const _ as *mut libc::c_char;
// libc::statx(
// libc::AT_FDCWD,
// c_path,
// libc::AT_SYMLINK_NOFOLLOW,
// mask,
// &mut statx_buf as *mut Statx,
// )
// };
// if status != 0 {
// bail!("Error checking alignment: {}", status);
// }
/// Direct IO alignment info.
pub(crate) struct StatxDioAlignInfo {
stx_dio_mem_align: usize,
stx_dio_offset_align: usize,
}

/// Gets Direct IO alignment info through `statx(2)` system call.
/// TODO: Does our machine support this?
///
/// Adapted from https://gist.github.com/problame/1c35cac41b7cd617779f8aae50f97155/revisions.
fn statx_align_info<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<StatxDioAlignInfo> {
const REQUESTS: [(&str, u32); 2] = [
("STATX_BASIC_STATS", libc::STATX_BASIC_STATS),
("STATX_DIOALIGN", libc::STATX_DIOALIGN),
];

let mask = REQUESTS.iter().map(|(_, v)| v).fold(0, |l, r| l | r);
let mut statx_buf = unsafe { MaybeUninit::<Statx>::zeroed().assume_init() };
let status = unsafe {
let c_path = path.as_ref().as_os_str().as_bytes();
let c_path = c_path as *const _ as *mut libc::c_char;
libc::statx(
libc::AT_FDCWD,
c_path,
libc::AT_SYMLINK_NOFOLLOW,
mask,
&mut statx_buf as *mut Statx,
)
};
if status != 0 {
bail!("Error checking alignment: {}", status);
}

// let mut request_not_fulfilled = Vec::new();
// for (name, r) in REQUESTS {
// if statx_buf.stx_attributes_mask & r as u64 == 0 {
// request_not_fulfilled.push(name);
// }
// }
// if !request_not_fulfilled.is_empty() {
// bail!(
// "One or more requested statx attributes not supported: {:?}",
// request_not_fulfilled,
// )
// }
let mut request_not_fulfilled = Vec::new();
for (name, r) in REQUESTS {
if statx_buf.stx_attributes_mask & r as u64 == 0 {
request_not_fulfilled.push(name);
}
}
if !request_not_fulfilled.is_empty() {
bail!(
"One or more requested statx attributes not supported: {:?}",
request_not_fulfilled,
)
}

// if statx_buf.stx_mode as u32 & libc::S_IFREG != libc::S_IFREG {
// bail!("not a regular file: statx.mode={}", statx_buf.stx_mode);
// }
if statx_buf.stx_mode as u32 & libc::S_IFREG != libc::S_IFREG {
bail!("not a regular file: statx.mode={}", statx_buf.stx_mode);
}

// Ok(StatxDioAlignInfo {
// stx_dio_mem_align: statx_buf.stx_dio_mem_align as usize,
// stx_dio_offset_align: statx_buf.stx_dio_offset_align as usize,
// })
// }
Ok(StatxDioAlignInfo {
stx_dio_mem_align: statx_buf.stx_dio_mem_align as usize,
stx_dio_offset_align: statx_buf.stx_dio_offset_align as usize,
})
}
}
99 changes: 99 additions & 0 deletions pageserver/src/virtual_file/dio/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::ops::{Deref, DerefMut};

use aligned_vec::AVec;
use bytes::buf::UninitSlice;

pub struct IoBufferMut(AVec<u8, aligned_vec::RuntimeAlign>);

impl IoBufferMut {
pub fn new(align: usize) -> Self {
IoBufferMut(AVec::new(align))
}

#[inline]
pub fn with_capacity(align: usize, capacity: usize) -> Self {
IoBufferMut(AVec::with_capacity(align, capacity))
}

#[inline]
pub fn capacity(&self) -> usize {
self.0.capacity()
}

#[inline]
pub fn reserve(&mut self, additional: usize) {
self.0.reserve(additional)
}

pub unsafe fn set_len(&mut self, new_len: usize) {
self.0.set_len(new_len)
}
}

impl Deref for IoBufferMut {
type Target = [u8];

fn deref(&self) -> &Self::Target {
self.0.deref()
}
}

impl DerefMut for IoBufferMut {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}

unsafe impl bytes::BufMut for IoBufferMut {
#[inline]
fn remaining_mut(&self) -> usize {
// A vector can never have more than isize::MAX bytes
core::isize::MAX as usize - self.len()
}

#[inline]
unsafe fn advance_mut(&mut self, cnt: usize) {
let len = self.len();
let remaining = self.0.capacity() - len;

if remaining < cnt {
panic_advance(cnt, remaining);
}

// Addition will not overflow since the sum is at most the capacity.
self.set_len(len + cnt);
}

#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
if self.capacity() == self.len() {
self.reserve(self.0.alignment()); // Grow the vec by alignment
}

let cap = self.capacity();
let len = self.len();

let ptr = self.as_mut_ptr();
// SAFETY: Since `ptr` is valid for `cap` bytes, `ptr.add(len)` must be
// valid for `cap - len` bytes. The subtraction will not underflow since
// `len <= cap`.
unsafe { UninitSlice::from_raw_parts_mut(ptr.add(len), cap - len) }
}
}

/// Panic with a nice error message.
#[cold]
fn panic_advance(idx: usize, len: usize) -> ! {
panic!(
"advance out of bounds: the len is {} but advancing by {}",
len, idx
);
}

// #[cold]
// fn panic_does_not_fit(size: usize, nbytes: usize) -> ! {
// panic!(
// "size too large: the integer type can fit {} bytes, but nbytes is {}",
// size, nbytes
// );
// }

0 comments on commit 53b3f9f

Please sign in to comment.