Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ mod roundtrip_tests {
use core::panic;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::object_store::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile,
FileMetaStream, ListEntryStream, ObjectReaderWrapper, ObjectStore, SizedFile,
};
use datafusion::error::DataFusionError;
use datafusion::{
Expand Down Expand Up @@ -895,7 +895,7 @@ mod roundtrip_tests {
fn file_reader(
&self,
_file: SizedFile,
) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
) -> datafusion::error::Result<ObjectReaderWrapper> {
Err(DataFusionError::NotImplemented(
"this is only a test object store".to_string(),
))
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
Expand All @@ -49,15 +49,15 @@ impl FileFormat for AvroFormat {
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use async_trait::async_trait;
use futures::StreamExt;

use super::FileFormat;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX);

while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let mut reader = obj_reader?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
Expand All @@ -119,7 +119,7 @@ impl FileFormat for CsvFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use futures::StreamExt;

use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::NdJsonExec;
Expand Down Expand Up @@ -64,7 +64,7 @@ impl FileFormat for JsonFormat {
let mut schemas = Vec::new();
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);
while let Some(obj_reader) = readers.next().await {
let mut reader = BufReader::new(obj_reader?.sync_reader()?);
let mut reader = BufReader::new(obj_reader?);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = records_to_read > 0;
Expand All @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat {
Ok(Arc::new(schema))
}

async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, _reader: ObjectReaderWrapper) -> Result<Statistics> {
Ok(Statistics::default())
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use crate::logical_plan::Expr;
use crate::physical_plan::file_format::FileScanConfig;
use crate::physical_plan::{ExecutionPlan, Statistics};

use crate::datasource::object_store::ObjectReaderWrapper;
use async_trait::async_trait;

use super::object_store::{ObjectReader, ObjectReaderStream};
use super::object_store::ObjectReaderStream;

/// This trait abstracts all the file format specific implementations
/// from the `TableProvider`. This helps code re-utilization accross
Expand All @@ -53,7 +54,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {

/// Infer the statistics for the provided object. The cost and accuracy of the
/// estimated statistics might vary greatly between file formats.
async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics>;
async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result<Statistics>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
Expand Down
35 changes: 16 additions & 19 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Parquet format abstractions

use std::any::Any;
use std::io::Read;
use std::io::SeekFrom;
use std::sync::Arc;

use arrow::datatypes::Schema;
Expand All @@ -40,7 +40,7 @@ use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::object_store::{ObjectReaderStream, ObjectReaderWrapper};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::DataFusionError;
use crate::error::Result;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl FileFormat for ParquetFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
async fn infer_stats(&self, reader: ObjectReaderWrapper) -> Result<Statistics> {
let stats = fetch_statistics(reader)?;
Ok(stats)
}
Expand Down Expand Up @@ -268,19 +268,17 @@ fn summarize_min_max(
}

/// Read and parse the schema of the Parquet file at location `path`
fn fetch_schema(object_reader: Arc<dyn ObjectReader>) -> Result<Schema> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_schema(object_reader: ObjectReaderWrapper) -> Result<Schema> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;

Ok(schema)
}

/// Read and parse the statistics of the Parquet file at location `path`
fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
let obj_reader = ChunkObjectReader(object_reader);
let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?);
fn fetch_statistics(object_reader: ObjectReaderWrapper) -> Result<Statistics> {
let file_reader = Arc::new(SerializedFileReader::new(object_reader)?);
let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
let schema = arrow_reader.get_schema()?;
let num_fields = schema.fields().len();
Expand Down Expand Up @@ -336,22 +334,21 @@ fn fetch_statistics(object_reader: Arc<dyn ObjectReader>) -> Result<Statistics>
Ok(statistics)
}

/// A wrapper around the object reader to make it implement `ChunkReader`
pub struct ChunkObjectReader(pub Arc<dyn ObjectReader>);

impl Length for ChunkObjectReader {
impl Length for ObjectReaderWrapper {
fn len(&self) -> u64 {
self.0.length()
self.0.lock().length()
}
}

impl ChunkReader for ChunkObjectReader {
type T = Box<dyn Read + Send + Sync>;
impl ChunkReader for ObjectReaderWrapper {
type T = Self;

fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.sync_chunk_reader(start, length)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
let mut r = self.0.lock();
r.seek(SeekFrom::Start(start))
.map_err(|e| ParquetError::ArrowError(e.to_string()))?;
r.set_limit(length);
Ok(self.clone())
}
}

Expand Down
59 changes: 39 additions & 20 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};
use parking_lot::Mutex;

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderWrapper,
ObjectStore,
};
use crate::datasource::PartitionedFile;
use crate::error::{DataFusionError, Result};
Expand Down Expand Up @@ -55,18 +57,46 @@ impl ObjectStore for LocalFileSystem {
todo!()
}

fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(LocalFileReader::new(file)?))
fn file_reader(&self, file: SizedFile) -> Result<ObjectReaderWrapper> {
Ok(ObjectReaderWrapper(Arc::new(Mutex::new(
LocalFileReader::new(file)?,
))))
}
}

struct LocalFileReader {
file: SizedFile,
r: BufReader<File>,
total_size: u64,
limit: usize,
}

impl LocalFileReader {
fn new(file: SizedFile) -> Result<Self> {
Ok(Self { file })
Ok(Self {
r: BufReader::new(File::open(file.path)?),
total_size: file.size,
limit: file.size as usize,
})
}
}

impl Read for LocalFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
// read from current position to limit
if self.limit > 0 {
let read_len = std::cmp::min(self.limit, buf.len());
let read_len = self.r.read(&mut buf[..read_len])?;
self.limit -= read_len;
Ok(read_len)
} else {
Ok(0)
}
}
}

impl Seek for LocalFileReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
self.r.seek(pos)
}
}

Expand All @@ -82,23 +112,12 @@ impl ObjectReader for LocalFileReader {
)
}

fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>> {
// A new file descriptor is opened for each chunk reader.
// This okay because chunks are usually fairly large.
let mut file = File::open(&self.file.path)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original purpose of this PR. @liukun4515 initially found the HDFSObjectStore with many unnecessary opens. Same here for the local store, but more expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably misunderstand something here and I am sorry I don't quite follow all the comments on this PR.

If the issue you are trying to solve is that File::open is called too often, would it be possible to "memoize" the open here with a mutex inside of the FileReader?

Something like

struct LocalFileReader { 
...
    /// Keep the open file descriptor to avoid reopening it
   cache: Mutex<Option<Box<dyn Read + Send + Sync + Clone>>>
}

impl LocalFileReader { 
...
    fn sync_chunk_reader(
        &self,
        start: u64,
        length: usize,
    ) -> Result<Box<dyn Read + Send + Sync>> {
    let mut cache = self.cache.lock();
    if let Some(cache) = cache {
      return Ok(cache.clone())
    };
    *cache = File::open(...);
    return cache.clone();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the original problem is about too much open, and we do solve it in our HDFS object store implementations similar to your suggestion above.

However, as I think more of the ObjectReader API and its use, I think I've brought in one extra layer of abstraction, the "chunk" reader layer, into ObjectReader without benefits. I prefer the chunk reader is only Parquet related, and object readers should only care about like seeks and reads.

Therefore the current PR stops creating new readers from ObjectReader, but directly reads in the ObjectReader itself. And If we are seeking an ability to fetch multi parts from a parquet file simultaneously, we can utilize the struct PartitionedFile.

/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
    /// Path for the file (e.g. URL, filesystem path, etc)
    pub file_meta: FileMeta,
    /// Values of partition columns to be appended to each row
    pub partition_values: Vec<ScalarValue>,
    // We may include row group range here for a more fine-grained parallel execution
}

We could have a max_bytes_per_partition configuration during query planing, combine multiple parquet files into one partition (like we do now), or split a large parquet file into many ranges, and have each partition handle only several ranges. with the help of apache/arrow-rs#158.

And the last several comments are about how to avoid Mutex from the ObjectReaderWrapper . Since we read parquet file sequentially in a partition, Mutex may incur unnecessary overhead. Just have to write like this way to achieve interior mutability since ChunkReader API in parquet-rs needs so:

pub trait ChunkReader: Length + Send + Sync {
    type T: Read + Send;
    /// get a serialy readeable slice of the current reader
    /// This should fail if the slice exceeds the current bounds
    fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;   //  &self as well as send imposes the need for interior mutability
}

file.seek(SeekFrom::Start(start))?;

let file = BufReader::new(file.take(length as u64));

Ok(Box::new(file))
fn set_limit(&mut self, limit: usize) {
self.limit = limit;
}

fn length(&self) -> u64 {
self.file.size
self.total_size
}
}

Expand Down Expand Up @@ -167,7 +186,7 @@ pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
}

/// Helper method to convert a file location to a `LocalFileReader`
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
pub fn local_object_reader(file: String) -> ObjectReaderWrapper {
LocalFileSystem
.file_reader(local_unpartitioned_file(file).file_meta.sized_file)
.expect("File not found")
Expand Down
41 changes: 24 additions & 17 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

pub mod local;

use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::Read;
use std::io::{Read, Seek};
use std::pin::Pin;
use std::sync::Arc;

Expand All @@ -39,27 +39,34 @@ use crate::error::{DataFusionError, Result};
/// Note that the dynamic dispatch on the reader might
/// have some performance impacts.
#[async_trait]
pub trait ObjectReader: Send + Sync {
pub trait ObjectReader: Read + Seek + Send {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;

/// Get reader for a part [start, start + length] in the file
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>;

/// Get reader for the entire file
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}
Copy link
Member Author

@yjshen yjshen Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussions with @houqp and @richox, we agreed that the extra chunk semantic introduced in ObjectReader introduces irrelevant file format details to object stores and incurs needless complexity. Therefore the API simplifications.

Copy link
Contributor

@rdettai rdettai Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @yjshen! Do you have a pointer to that discussion so I can get the full context? also, why do you say that it introduces "irrelevant file format details". A chunk is a chunk, it can apply to any file format 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rdettai long time no see! I think the chunk is only a term from parquet reader implementation. And we are always using chunks from one same file sequentially in DataFusion.

/// Set the max number of bytes to be read from the underlying file, until it's reset.
/// Imitate [`std::io::Read::take`] since we are not [`Sized`]
fn set_limit(&mut self, limit: usize);

/// Get the size of the file
/// Total length of the underlying file. It's currently only used by Parquet reader
/// to read metadata from the end.
fn length(&self) -> u64;
}

#[derive(Clone)]
/// A wrapper over ObjectReader that reads file contents out.
///
/// Note: We use Arc<Mutex<>> over the reader mainly to reuse the same underlying
/// file handle while conforming to Parquet ChunkReader's [`parquet::file::reader::ChunkReader::get_read`]
/// over immutable reference.
pub struct ObjectReaderWrapper(pub Arc<Mutex<dyn ObjectReader>>);

impl Read for ObjectReaderWrapper {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.lock().read(buf)
}
}

/// Represents a specific file or a prefix (folder) that may
/// require further resolution
#[derive(Debug)]
Expand Down Expand Up @@ -123,7 +130,7 @@ pub type ListEntryStream =

/// Stream readers opened on a given object store
pub type ObjectReaderStream =
Pin<Box<dyn Stream<Item = Result<Arc<dyn ObjectReader>>> + Send + Sync>>;
Pin<Box<dyn Stream<Item = Result<ObjectReaderWrapper>> + Send + Sync>>;

/// A ObjectStore abstracts access to an underlying file/object storage.
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
Expand Down Expand Up @@ -158,7 +165,7 @@ pub trait ObjectStore: Sync + Send + Debug {
) -> Result<ListEntryStream>;

/// Get object reader for one file
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
fn file_reader(&self, file: SizedFile) -> Result<ObjectReaderWrapper>;
}

static LOCAL_SCHEME: &str = "file";
Expand Down
Loading