-
Couldn't load subscription status.
- Fork 1.7k
Avoid repeated open for one single file and simplify object reader API on the sync part
#1905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2c0d6d3
6294e8e
b33d6cb
105be41
d74bba7
014673e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,10 +19,10 @@ | |
|
|
||
| pub mod local; | ||
|
|
||
| use parking_lot::RwLock; | ||
| use parking_lot::{Mutex, RwLock}; | ||
| use std::collections::HashMap; | ||
| use std::fmt::{self, Debug}; | ||
| use std::io::Read; | ||
| use std::io::{Read, Seek}; | ||
| use std::pin::Pin; | ||
| use std::sync::Arc; | ||
|
|
||
|
|
@@ -39,27 +39,34 @@ use crate::error::{DataFusionError, Result}; | |
| /// Note that the dynamic dispatch on the reader might | ||
| /// have some performance impacts. | ||
| #[async_trait] | ||
| pub trait ObjectReader: Send + Sync { | ||
| pub trait ObjectReader: Read + Seek + Send { | ||
| /// Get reader for a part [start, start + length] in the file asynchronously | ||
| async fn chunk_reader(&self, start: u64, length: usize) | ||
| -> Result<Box<dyn AsyncRead>>; | ||
|
|
||
| /// Get reader for a part [start, start + length] in the file | ||
| fn sync_chunk_reader( | ||
| &self, | ||
| start: u64, | ||
| length: usize, | ||
| ) -> Result<Box<dyn Read + Send + Sync>>; | ||
|
|
||
| /// Get reader for the entire file | ||
| fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> { | ||
| self.sync_chunk_reader(0, self.length() as usize) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hi @yjshen! Do you have a pointer to that discussion so I can get the full context? also, why do you say that it introduces "irrelevant file format details". A chunk is a chunk, it can apply to any file format 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @rdettai long time no see! I think the chunk is only a term from parquet reader implementation. And we are always using chunks from one same file sequentially in DataFusion. |
||
| /// Set the max number of bytes to be read from the underlying file, until it's reset. | ||
| /// Imitate [`std::io::Read::take`] since we are not [`Sized`] | ||
| fn set_limit(&mut self, limit: usize); | ||
|
|
||
| /// Get the size of the file | ||
| /// Total length of the underlying file. It's currently only used by Parquet reader | ||
| /// to read metadata from the end. | ||
| fn length(&self) -> u64; | ||
| } | ||
|
|
||
| #[derive(Clone)] | ||
| /// A wrapper over ObjectReader that reads file contents out. | ||
| /// | ||
| /// Note: We use Arc<Mutex<>> over the reader mainly to reuse the same underlying | ||
| /// file handle while conforming to Parquet ChunkReader's [`parquet::file::reader::ChunkReader::get_read`] | ||
| /// over immutable reference. | ||
| pub struct ObjectReaderWrapper(pub Arc<Mutex<dyn ObjectReader>>); | ||
|
|
||
| impl Read for ObjectReaderWrapper { | ||
| fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { | ||
| self.0.lock().read(buf) | ||
| } | ||
| } | ||
|
|
||
| /// Represents a specific file or a prefix (folder) that may | ||
| /// require further resolution | ||
| #[derive(Debug)] | ||
|
|
@@ -123,7 +130,7 @@ pub type ListEntryStream = | |
|
|
||
| /// Stream readers opened on a given object store | ||
| pub type ObjectReaderStream = | ||
| Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>; | ||
| Pin<Box<dyn Stream<Item = Result<ObjectReaderWrapper>> + Send + Sync>>; | ||
|
|
||
| /// A ObjectStore abstracts access to an underlying file/object storage. | ||
| /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes | ||
|
|
@@ -158,7 +165,7 @@ pub trait ObjectStore: Sync + Send + Debug { | |
| ) -> Result<ListEntryStream>; | ||
|
|
||
| /// Get object reader for one file | ||
| fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>; | ||
| fn file_reader(&self, file: SizedFile) -> Result<ObjectReaderWrapper>; | ||
| } | ||
|
|
||
| static LOCAL_SCHEME: &str = "file"; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the original purpose of this PR. @liukun4515 initially found the HDFSObjectStore with many unnecessary opens. Same here for the local store, but more expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably misunderstand something here and I am sorry I don't quite follow all the comments on this PR.
If the issue you are trying to solve is that
File::openis called too often, would it be possible to "memoize" the open here with a mutex inside of the FileReader?Something like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the original problem is about too much open, and we do solve it in our HDFS object store implementations similar to your suggestion above.
However, as I think more of the ObjectReader API and its use, I think I've brought in one extra layer of abstraction, the "chunk" reader layer, into ObjectReader without benefits. I prefer the
chunk readeris only Parquet related, and object readers should only care about like seeks and reads.Therefore the current PR stops creating new readers from
ObjectReader, but directly reads in theObjectReaderitself. And If we are seeking an ability to fetch multi parts from a parquet file simultaneously, we can utilize the structPartitionedFile.We could have a
max_bytes_per_partitionconfiguration during query planing, combine multiple parquet files into one partition (like we do now), or split a large parquet file into many ranges, and have each partition handle only several ranges. with the help of apache/arrow-rs#158.And the last several comments are about how to avoid
Mutexfrom theObjectReaderWrapper. Since we read parquet file sequentially in a partition, Mutex may incur unnecessary overhead. Just have to write like this way to achieve interior mutability since ChunkReader API in parquet-rs needs so: