Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,24 +885,25 @@ mod roundtrip_tests {
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
use datafusion::datafusion_storage::{
object_store::{
local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore,
},
SizedFile,
};
use datafusion::datasource::listing::ListingTable;
use datafusion::error::DataFusionError;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datafusion_storage::{
self,
object_store::{
local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
ObjectStore,
},
SizedFile,
},
datasource::listing::ListingTable,
logical_plan::{
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, Repartition,
ToDFSchema,
},
prelude::*,
sql::parser::FileType,
};
use std::io;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -913,8 +914,9 @@ mod roundtrip_tests {
async fn list_file(
&self,
_prefix: &str,
) -> datafusion::error::Result<FileMetaStream> {
Err(DataFusionError::NotImplemented(
) -> datafusion_storage::Result<FileMetaStream> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}
Expand All @@ -923,17 +925,19 @@ mod roundtrip_tests {
&self,
_prefix: &str,
_delimiter: Option<String>,
) -> datafusion::error::Result<ListEntryStream> {
Err(DataFusionError::NotImplemented(
) -> datafusion_storage::Result<ListEntryStream> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}

fn file_reader(
&self,
_file: SizedFile,
) -> datafusion::error::Result<Arc<dyn ObjectReader>> {
Err(DataFusionError::NotImplemented(
) -> datafusion_storage::Result<Arc<dyn ObjectReader>> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"this is only a test object store".to_string(),
))
}
Expand Down
3 changes: 2 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use crate::{convert_box_required, convert_required};
use chrono::{TimeZone, Utc};

use datafusion::datafusion_storage::{
object_store::local::LocalFileSystem, FileMeta, PartitionedFile, SizedFile,
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::execution::context::ExecutionProps;

use datafusion::physical_plan::file_format::FileScanConfig;
Expand Down
17 changes: 7 additions & 10 deletions ballista/rust/core/src/serde/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{convert_box_required, convert_required, into_physical_plan, into_req
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_storage::object_store::local::LocalFileSystem;
use datafusion::datafusion_storage::PartitionedFile;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::logical_plan::window_frames::WindowFrame;
use datafusion::physical_plan::aggregates::create_aggregate_expr;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
Expand Down Expand Up @@ -941,33 +941,30 @@ mod roundtrip_tests {
use std::sync::Arc;

use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion::datafusion_storage::{
object_store::local::LocalFileSystem, PartitionedFile,
};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::SessionContext;
use datafusion::{
arrow::{
compute::kernels::sort::SortOptions,
datatypes::{DataType, Field, Schema},
},
datafusion_storage::object_store::local::LocalFileSystem,
datasource::listing::PartitionedFile,
logical_plan::{JoinType, Operator},
physical_plan::{
empty::EmptyExec,
expressions::{binary, col, lit, InListExpr, NotExpr},
expressions::{Avg, Column, PhysicalSortExpr},
file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
hash_aggregate::{AggregateMode, HashAggregateExec},
hash_join::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
sorts::sort::SortExec,
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics,
},
prelude::SessionContext,
scalar::ScalarValue,
};

use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::Statistics;

use super::super::super::error::Result;
use super::super::protobuf;
use crate::execution_plans::ShuffleWriterExec;
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion::physical_plan::{
Statistics,
};

use datafusion::datafusion_storage::PartitionedFile;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::physical_plan::file_format::FileScanConfig;

use datafusion::physical_plan::expressions::{Count, Literal};
Expand Down
1 change: 0 additions & 1 deletion datafusion-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ path = "src/lib.rs"
[dependencies]
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
futures = "0.3"
parking_lot = "0.12"
tempfile = "3"
Expand Down
35 changes: 4 additions & 31 deletions datafusion-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
pub mod object_store;

use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use std::{io, result};

/// Result type for operations that could result in an io error
pub type Result<T> = result::Result<T, io::Error>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im wondering if any impact from using this instead of DataFusionError::IoError but i think its fine and of course it would require keeping datafusion-common dependency which would defeat the point of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this PR is to remove the datafusion-common dependency.

When you need to use DataFusionError::IoError, it's simply to add .map_err(DataFusionError::IoError) like some places in this PR does.


/// Represents a specific file or a prefix (folder) that may
/// require further resolution
Expand Down Expand Up @@ -72,33 +75,3 @@ impl std::fmt::Display for FileMeta {
write!(f, "{} (size: {})", self.path(), self.size())
}
}

#[derive(Debug, Clone)]
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub file_meta: FileMeta,
/// Values of partition columns to be appended to each row
pub partition_values: Vec<ScalarValue>,
// We may include row group range here for a more fine-grained parallel execution
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: String, size: u64) -> Self {
Self {
file_meta: FileMeta {
sized_file: SizedFile { path, size },
last_modified: None,
},
partition_values: vec![],
}
}
}

impl std::fmt::Display for PartitionedFile {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.file_meta)
}
}
27 changes: 13 additions & 14 deletions datafusion-storage/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
//! Object store that represents the Local File System.

use std::fs::{self, File, Metadata};
use std::io;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, AsyncRead, StreamExt};

use datafusion_common::{DataFusionError, Result};

use crate::{FileMeta, PartitionedFile, SizedFile};
use crate::{FileMeta, Result, SizedFile};

use super::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
Expand Down Expand Up @@ -131,7 +130,10 @@ async fn list_all(prefix: String) -> Result<FileMetaStream> {
files.push(get_meta(child_path.to_owned(), metadata))
}
} else {
return Err(DataFusionError::Plan("Invalid path".to_string()));
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid path".to_string(),
));
}
}
Ok(files)
Expand Down Expand Up @@ -171,22 +173,19 @@ pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
/// Helper method to convert a file location to a `LocalFileReader`
pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
LocalFileSystem
.file_reader(local_unpartitioned_file(file).file_meta.sized_file)
.file_reader(local_unpartitioned_file(file).sized_file)
.expect("File not found")
}

/// Helper method to fetch the file size and date at given path and create a `FileMeta`
pub fn local_unpartitioned_file(file: String) -> PartitionedFile {
pub fn local_unpartitioned_file(file: String) -> FileMeta {
let metadata = fs::metadata(&file).expect("Local file metadata");
PartitionedFile {
file_meta: FileMeta {
sized_file: SizedFile {
size: metadata.len(),
path: file,
},
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
FileMeta {
sized_file: SizedFile {
size: metadata.len(),
path: file,
},
partition_values: vec![],
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
}
}

Expand Down
7 changes: 1 addition & 6 deletions datafusion-storage/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::{AsyncRead, Stream, StreamExt};

use crate::{FileMeta, ListEntry, PartitionedFile, SizedFile};
use datafusion_common::Result;
use crate::{FileMeta, ListEntry, Result, SizedFile};

/// Stream of files listed from object store
pub type FileMetaStream =
Pin<Box<dyn Stream<Item = Result<FileMeta>> + Send + Sync + 'static>>;

/// Stream of files get listed from object store
pub type PartitionedFileStream =
Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;

/// Stream of list entries obtained from object store
pub type ListEntryStream =
Pin<Box<dyn Stream<Item = Result<ListEntry>> + Send + Sync + 'static>>;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ impl FileFormat for AvroFormat {
mod tests {
use crate::{
datafusion_storage::object_store::local::{
local_object_reader, local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
local_object_reader, local_object_reader_stream, LocalFileSystem,
},
physical_plan::collect,
};

use super::*;
use crate::datasource::listing::local_unpartitioned_file;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
Expand Down
6 changes: 3 additions & 3 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ mod tests {
use arrow::array::StringArray;

use super::*;
use crate::datasource::listing::local_unpartitioned_file;
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_storage::object_store::local::{
local_object_reader, local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
local_object_reader, local_object_reader_stream, LocalFileSystem,
},
datasource::file_format::FileScanConfig,
physical_plan::collect,
Expand Down Expand Up @@ -271,7 +271,7 @@ mod tests {
let exec = format
.create_physical_plan(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
object_store: Arc::new(LocalFileSystem),
file_schema,
file_groups,
statistics,
Expand Down
5 changes: 2 additions & 3 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::{
datafusion_storage::object_store::local::{
local_object_reader, local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
local_object_reader, local_object_reader_stream, LocalFileSystem,
},
datasource::file_format::FileScanConfig,
datasource::{file_format::FileScanConfig, listing::local_unpartitioned_file},
physical_plan::collect,
};

Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl FileFormat for ParquetFormat {

async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef> {
let merged_schema = readers
.map_err(DataFusionError::IoError)
.try_fold(Schema::empty(), |acc, reader| async {
let next_schema = fetch_schema(reader);
Schema::try_merge([acc, next_schema?])
Expand Down Expand Up @@ -351,16 +352,17 @@ impl ChunkReader for ChunkObjectReader {
fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.sync_chunk_reader(start, length)
.map_err(DataFusionError::IoError)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
}
}

#[cfg(test)]
mod tests {
use crate::datasource::listing::local_unpartitioned_file;
use crate::physical_plan::collect;
use datafusion_storage::object_store::local::{
local_object_reader, local_object_reader_stream, local_unpartitioned_file,
LocalFileSystem,
local_object_reader, local_object_reader_stream, LocalFileSystem,
};

use super::*;
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use arrow::{
record_batch::RecordBatch,
};
use chrono::{TimeZone, Utc};
use datafusion_common::DataFusionError;
use futures::{
stream::{self},
StreamExt, TryStreamExt,
Expand All @@ -44,10 +45,8 @@ use crate::{
scalar::ScalarValue,
};

use datafusion_storage::{
object_store::{ObjectStore, PartitionedFileStream},
FileMeta, PartitionedFile, SizedFile,
};
use super::{PartitionedFile, PartitionedFileStream};
use datafusion_storage::{object_store::ObjectStore, FileMeta, SizedFile};

const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_";
const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_";
Expand Down Expand Up @@ -234,7 +233,11 @@ pub async fn pruned_partition_list(
// store if we switch to a streaming-stlye pruning of the files. For instance S3 lists
// 1000 items at a time so batches of 1000 would be ideal with S3 as store.
.chunks(1024)
.map(|v| v.into_iter().collect::<Result<Vec<_>>>())
.map(|v| {
v.into_iter()
.collect::<datafusion_storage::Result<Vec<_>>>()
})
.map_err(DataFusionError::IoError)
.map(move |metas| paths_to_batch(table_partition_cols, &stream_path, &metas?))
.try_collect()
.await?;
Expand Down
Loading