Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Use mmap-ed memory if possible in Parquet reader #17725

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ indexmap = { version = "2", features = ["std"] }
itoa = "1.0.6"
itoap = { version = "1", features = ["simd"] }
memchr = "2.6"
memmap = { package = "memmap2", version = "0.7" }
multiversion = "0.7"
ndarray = { version = "0.15", default-features = false }
num-traits = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ polars-error = { workspace = true }
polars-json = { workspace = true, optional = true }
polars-parquet = { workspace = true, optional = true }
polars-time = { workspace = true, features = [], optional = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ['mmap'] }

ahash = { workspace = true }
arrow = { workspace = true }
Expand All @@ -30,7 +30,7 @@ futures = { workspace = true, optional = true }
glob = { version = "0.3" }
itoa = { workspace = true, optional = true }
memchr = { workspace = true }
memmap = { package = "memmap2", version = "0.7" }
memmap = { workspace = true }
num-traits = { workspace = true }
object_store = { workspace = true, optional = true }
once_cell = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use memmap::Mmap;
use once_cell::sync::Lazy;
use polars_core::config::verbose;
use polars_error::{polars_bail, PolarsResult};
use polars_utils::mmap::{MemSlice, MmapSlice};

// Keep track of memory mapped files so we don't write to them while reading
// Use a btree as it uses less memory than a hashmap and this thing never shrinks.
Expand Down Expand Up @@ -143,6 +144,16 @@ impl std::ops::Deref for ReaderBytes<'_> {
}
}

impl<'a> ReaderBytes<'a> {
pub fn into_mem_slice(self) -> MemSlice {
match self {
ReaderBytes::Borrowed(v) => MemSlice::from_slice(v),
ReaderBytes::Owned(v) => MemSlice::from_vec(v),
ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(MmapSlice::new(v)),
}
}
}

impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
fn from(m: &'a mut T) -> Self {
match m.to_bytes() {
Expand Down
21 changes: 12 additions & 9 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use bytes::Bytes;
#[cfg(feature = "async")]
use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::parquet::read::MemReader;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

/// Store columns data in two scenarios:
/// 1. a local memory mapped file
Expand All @@ -21,8 +21,8 @@ use polars_parquet::read::{
/// b. asynchronously fetch them in parallel, for example using object_store
/// c. store the data in this data structure
/// d. when all the data is available deserialize on multiple threads, for example using rayon
pub enum ColumnStore<'a> {
Local(&'a [u8]),
pub enum ColumnStore {
Local(MemSlice),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Bytes>),
}
Expand All @@ -33,7 +33,7 @@ pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
) -> Vec<(&'a ColumnChunkMetaData, MemSlice)> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _mmap_single_column(store, meta))
Expand All @@ -43,18 +43,18 @@ pub(super) fn mmap_columns<'a>(
fn _mmap_single_column<'a>(
store: &'a ColumnStore,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
) -> (&'a ColumnChunkMetaData, MemSlice) {
let (start, len) = meta.byte_range();
let chunk = match store {
ColumnStore::Local(file) => &file[start as usize..(start + len) as usize],
ColumnStore::Local(mem_slice) => mem_slice.slice(start as usize, (start + len) as usize),
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
)
});
entry.as_ref()
MemSlice::from_slice(entry.as_ref())
},
};
(meta, chunk)
Expand All @@ -63,7 +63,7 @@ fn _mmap_single_column<'a>(
// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows us to memory map
pub(super) fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, &'a [u8])>,
columns: Vec<(&ColumnChunkMetaData, MemSlice)>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
Expand All @@ -73,8 +73,11 @@ pub(super) fn to_deserializer<'a>(
let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
// Advise fetching the data for the column chunk
chunk.prefetch();

let pages = PageReader::new(
MemReader::from_slice(chunk),
MemReader::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
Expand Down
10 changes: 7 additions & 3 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use polars_utils::mmap::MemSlice;
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -446,8 +447,7 @@ pub fn read_parquet<R: MmapBytesReader>(
}

let reader = ReaderBytes::from(&mut reader);
let bytes = reader.deref();
let store = mmap::ColumnStore::Local(bytes);
let store = mmap::ColumnStore::Local(reader.into_mem_slice());

let dfs = rg_to_dfs(
&store,
Expand Down Expand Up @@ -492,8 +492,12 @@ impl FetchRowGroupsFromMmapReader {
let reader_bytes = get_reader_bytes(reader_ptr)?;
Ok(FetchRowGroupsFromMmapReader(reader_bytes))
}

fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
Ok(mmap::ColumnStore::Local(self.0.deref()))
// @TODO: we can something smarter here with mmap
Ok(mmap::ColumnStore::Local(MemSlice::from_slice(
self.0.deref(),
)))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { workspace = true, optional = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
polars-utils = { workspace = true, features = ["mmap"] }
simdutf8 = { workspace = true }

parquet-format-safe = "0.2"
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ mod utils;
use arrow::array::{Array, DictionaryKey, FixedSizeListArray, ListArray, MapArray};
use arrow::datatypes::{ArrowDataType, Field, IntervalUnit};
use arrow::offset::Offsets;
use polars_utils::mmap::MemReader;
use simple::page_iter_to_arrays;

pub use self::nested_utils::{init_nested, InitNested, NestedArrayIter, NestedState};
pub use self::struct_::StructIterator;
use super::*;
use crate::parquet::read::{get_page_iterator as _get_page_iterator, MemReader};
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::schema::types::PrimitiveType;

/// Creates a new iterator of compressed pages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ mod tests {
use super::iter_to_arrays;
use crate::parquet::encoding::Encoding;
use crate::parquet::error::ParquetError;
#[allow(unused_imports)]
use crate::parquet::fallible_streaming_iterator;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::{PhysicalType, PrimitiveType};
#[allow(unused_imports)]
use crate::parquet::{fallible_streaming_iterator, CowBuffer};

#[test]
fn limit() {
Expand Down
5 changes: 2 additions & 3 deletions crates/polars-parquet/src/arrow/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use arrow::array::Array;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatchT;
use polars_error::PolarsResult;
use polars_utils::mmap::MemReader;

use super::{ArrayIter, RowGroupMetaData};
use crate::arrow::read::column_iter_to_arrays;
use crate::parquet::indexes::FilteredPage;
use crate::parquet::metadata::ColumnChunkMetaData;
use crate::parquet::read::{
BasicDecompressor, IndexedPageReader, MemReader, PageMetaData, PageReader,
};
use crate::parquet::read::{BasicDecompressor, IndexedPageReader, PageMetaData, PageReader};

/// An [`Iterator`] of [`RecordBatchT`] that (dynamically) adapts a vector of iterators of [`Array`] into
/// an iterator of [`RecordBatchT`].
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use crate::arrow::write::{slice_nested_leaf, utils};
use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{DictPage, Page};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::parquet::CowBuffer;
use crate::write::DynIter;

pub(crate) fn encode_as_dictionary_optional(
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::parquet::encoding::hybrid_rle::encode;
use crate::parquet::encoding::Encoding;
use crate::parquet::metadata::Descriptor;
use crate::parquet::page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2};
use crate::parquet::read::CowBuffer;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::ParquetStatistics;
use crate::parquet::CowBuffer;

fn encode_iter_v1<I: Iterator<Item = bool>>(buffer: &mut Vec<u8>, iter: I) -> PolarsResult<()> {
buffer.extend_from_slice(&[0; 4]);
Expand Down
34 changes: 34 additions & 0 deletions crates/polars-parquet/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ pub mod statistics;
pub mod types;
pub mod write;

use std::ops::Deref;

use parquet_format_safe as thrift_format;
use polars_utils::mmap::MemSlice;
pub use streaming_decompression::{fallible_streaming_iterator, FallibleStreamingIterator};

pub const HEADER_SIZE: u64 = PARQUET_MAGIC.len() as u64;
Expand All @@ -24,3 +27,34 @@ pub const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

/// The number of bytes read at the end of the parquet file on first read
const DEFAULT_FOOTER_READ_SIZE: u64 = 64 * 1024;

/// A copy-on-write buffer over bytes
#[derive(Debug, Clone)]
pub enum CowBuffer {
Borrowed(MemSlice),
Owned(Vec<u8>),
}

impl Deref for CowBuffer {
type Target = [u8];

#[inline(always)]
fn deref(&self) -> &Self::Target {
match self {
CowBuffer::Borrowed(v) => v.deref(),
CowBuffer::Owned(v) => v.deref(),
}
}
}

impl CowBuffer {
pub fn to_mut(&mut self) -> &mut Vec<u8> {
match self {
CowBuffer::Borrowed(v) => {
*self = Self::Owned(v.clone().to_vec());
self.to_mut()
},
CowBuffer::Owned(v) => v,
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/parquet/page/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::read::CowBuffer;
use super::CowBuffer;
use crate::parquet::compression::Compression;
use crate::parquet::encoding::{get_length, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use parquet_format_safe::DataPageHeaderV2;

use super::page::PageIterator;
use super::CowBuffer;
use crate::parquet::compression::{self, Compression};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page};
use crate::parquet::FallibleStreamingIterator;
use crate::parquet::{CowBuffer, FallibleStreamingIterator};

fn decompress_v1(
compressed: &[u8],
Expand Down
6 changes: 2 additions & 4 deletions crates/polars-parquet/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ pub use indexes::{read_columns_indexes, read_pages_locations};
pub use metadata::{deserialize_metadata, read_metadata, read_metadata_with_size};
#[cfg(feature = "async")]
pub use page::{get_page_stream, get_page_stream_from_column_start};
pub use page::{
CowBuffer, IndexedPageReader, MemReader, MemReaderSlice, PageFilter, PageIterator,
PageMetaData, PageReader,
};
pub use page::{IndexedPageReader, PageFilter, PageIterator, PageMetaData, PageReader};
use polars_utils::mmap::MemReader;
#[cfg(feature = "async")]
pub use stream::read_metadata as read_metadata_async;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;
use std::io::{Seek, SeekFrom};

use super::memreader::MemReader;
use polars_utils::mmap::{MemReader, MemSlice};

use super::reader::{finish_page, read_page_header, PageMetaData};
use super::MemReaderSlice;
use crate::parquet::error::ParquetError;
use crate::parquet::indexes::{FilteredPage, Interval};
use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor};
Expand Down Expand Up @@ -44,7 +44,7 @@ fn read_page(
reader: &mut MemReader,
start: u64,
length: usize,
) -> Result<(ParquetPageHeader, MemReaderSlice), ParquetError> {
) -> Result<(ParquetPageHeader, MemSlice), ParquetError> {
// seek to the page
reader.seek(SeekFrom::Start(start))?;

Expand Down
Loading