Skip to content

Commit

Permalink
perf: use mmap-ed memory if possible in Parquet
Browse files Browse the repository at this point in the history
This seems to have negative performance without prefetching or with madvise.
The current implementation uses prefetching to the L2 cache. This seems to have ~5% increased performance for multithreaded and ~10% increased performance on single-threaded. All this testing is done on cold file reads. Warm file reads seems to be faster as well, but it is more noisy.

Multi-threaded:

```
Benchmark 1: ./plparbench-before
  Time (mean ± σ):      6.049 s ±  0.031 s    [User: 5.813 s, System: 5.811 s]
  Range (min … max):    6.013 s …  6.086 s    5 runs

Benchmark 2: ./plparbench-after
  Time (mean ± σ):      5.761 s ±  0.020 s    [User: 5.083 s, System: 5.792 s]
  Range (min … max):    5.735 s …  5.788 s    5 runs

Summary
  ./plparbench-after ran
    1.05 ± 0.01 times faster than ./plparbench-before
```

Single-threaded:

```
Benchmark 1: ./plparbench-before
  Time (mean ± σ):     13.601 s ±  0.184 s    [User: 5.295 s, System: 5.206 s]
  Range (min … max):   13.447 s … 13.858 s    5 runs

Benchmark 2: ./plparbench-after
  Time (mean ± σ):     12.398 s ±  0.152 s    [User: 4.862 s, System: 5.134 s]
  Range (min … max):   12.276 s … 12.664 s    5 runs

Summary
  ./plparbench-after ran
    1.10 ± 0.02 times faster than ./plparbench-before
```
  • Loading branch information
coastalwhite committed Jul 19, 2024
1 parent f70b7f9 commit 4570ba7
Show file tree
Hide file tree
Showing 16 changed files with 245 additions and 60 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ indexmap = { version = "2", features = ["std"] }
itoa = "1.0.6"
itoap = { version = "1", features = ["simd"] }
memchr = "2.6"
memmap = { package = "memmap2", version = "0.7" }
multiversion = "0.7"
ndarray = { version = "0.15", default-features = false }
num-traits = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ polars-error = { workspace = true }
polars-json = { workspace = true, optional = true }
polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, features = [], optional = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ['mmap'] }

ahash = { workspace = true }
arrow = { workspace = true }
Expand All @@ -30,7 +30,7 @@ futures = { workspace = true, optional = true }
glob = { version = "0.3" }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
memmap = { workspace = true }
num-traits = { workspace = true }
object_store = { workspace = true, optional = true }
once_cell = { workspace = true }
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use memmap::Mmap;
use once_cell::sync::Lazy;
use polars_core::config::verbose;
use polars_error::{polars_bail, PolarsResult};
use polars_parquet::parquet::read::MemSlice;
use polars_utils::mmap::MmapSlice;

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
Expand Down Expand Up @@ -143,6 +145,16 @@ impl std::ops::Deref for ReaderBytes<'_> {
}
}

impl<'a> ReaderBytes<'a> {
pub fn into_mem_slice(self) -> MemSlice {
match self {
ReaderBytes::Borrowed(v) => MemSlice::from_slice(v),
ReaderBytes::Owned(v) => MemSlice::from_vec(v),
ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(MmapSlice::new(v)),
}
}
}

impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
fn from(m: &'a mut T) -> Self {
match m.to_bytes() {
Expand Down
21 changes: 12 additions & 9 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::parquet::read::MemReader;
use polars_parquet::parquet::read::{MemReader, MemSlice};
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
Expand All @@ -21,8 +21,8 @@ use polars_parquet::read::{
/// b. asynchronously fetch them in parallel, for example using object_store
/// c. store the data in this data structure
/// d. when all the data is available deserialize on multiple threads, for example using rayon
pub enum ColumnStore<'a> {
Local(&'a [u8]),
pub enum ColumnStore {
Local(MemSlice),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Bytes>),
}
Expand All @@ -33,7 +33,7 @@ pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _mmap_single_column(store, meta))
Expand All @@ -43,18 +43,18 @@ pub(super) fn mmap_columns<'a>(
fn _mmap_single_column<'a>(
store: &'a ColumnStore,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
) -> (&'a ColumnChunkMetaData, MemSlice) {
let (start, len) = meta.byte_range();
let chunk = match store {
ColumnStore::Local(file) => &file[start as usize..(start + len) as usize],
ColumnStore::Local(mem_slice) => mem_slice.slice(start as usize, (start + len) as usize),
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
)
});
entry.as_ref()
MemSlice::from_slice(entry.as_ref())
},
};
(meta, chunk)
Expand All @@ -63,7 +63,7 @@ fn _mmap_single_column<'a>(
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub(super) fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, &'a [u8])>,
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
Expand All @@ -73,8 +73,11 @@ pub(super) fn to_deserializer<'a>(
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
// Advise fetching the data for the column chunk
chunk.populate();

let pages = PageReader::new(
MemReader::from_slice(chunk),
MemReader::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::parquet::read::MemSlice;
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use rayon::prelude::*;

Expand Down Expand Up @@ -446,8 +447,7 @@ pub fn read_parquet<R: MmapBytesReader>(
}

let reader = ReaderBytes::from(&mut reader);
let bytes = reader.deref();
let store = mmap::ColumnStore::Local(bytes);
let store = mmap::ColumnStore::Local(reader.into_mem_slice());

let dfs = rg_to_dfs(
&store,
Expand Down Expand Up @@ -492,8 +492,12 @@ impl FetchRowGroupsFromMmapReader {
let reader_bytes = get_reader_bytes(reader_ptr)?;
Ok(FetchRowGroupsFromMmapReader(reader_bytes))
}

fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
Ok(mmap::ColumnStore::Local(self.0.deref()))
// @TODO: we can something smarter here with mmap
Ok(mmap::ColumnStore::Local(MemSlice::from_slice(
self.0.deref(),
)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { workspace = true, optional = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ["mmap"] }
simdutf8 = { workspace = true }

parquet-format-safe = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size}
#[cfg(feature = "async")]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{
CowBuffer, IndexedPageReader, MemReader, MemReaderSlice, PageFilter, PageIterator,
PageMetaData, PageReader,
CowBuffer, IndexedPageReader, MemReader, MemSlice, PageFilter, PageIterator, PageMetaData,
PageReader,
};
#[cfg(feature = "async")]
pub use stream::read_metadata as read_metadata_async;
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/parquet/read/page/indexed_reader.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::collections::VecDeque;
use std::io::{Seek, SeekFrom};

use super::memreader::MemReader;
use super::memreader::{MemReader, MemSlice};
use super::reader::{finish_page, read_page_header, PageMetaData};
use super::MemReaderSlice;
use crate::parquet::error::ParquetError;
use crate::parquet::indexes::{FilteredPage, Interval};
use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor};
Expand Down Expand Up @@ -44,7 +43,7 @@ fn read_page(
reader: &mut MemReader,
start: u64,
length: usize,
) -> Result<(ParquetPageHeader, MemReaderSlice), ParquetError> {
) -> Result<(ParquetPageHeader, MemSlice), ParquetError> {
// seek to the page
reader.seek(SeekFrom::Start(start))?;

Expand Down
Loading

0 comments on commit 4570ba7

Please sign in to comment.