Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repository = "https://github.com/apache/arrow-datafusion"
readme = "../README.md"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
license = "Apache-2.0"
keywords = [ "arrow", "query", "sql" ]
keywords = ["arrow", "query", "sql"]
include = [
"benches/*.rs",
"src/**/*.rs",
Expand All @@ -48,6 +48,8 @@ pyarrow = ["pyo3", "arrow/pyarrow"]
force_hash_collisions = []
# Used to enable the avro format
avro = ["avro-rs", "num-traits"]
# Used to enable hdfs as remote object store
hdfs = ["fs-hdfs"]

[dependencies]
ahash = "0.7"
Expand All @@ -60,7 +62,7 @@ num_cpus = "1.13.0"
chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
pin-project-lite= "^0.2.7"
pin-project-lite = "^0.2.7"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
tokio-stream = "0.1"
log = "^0.4"
Expand All @@ -77,6 +79,8 @@ rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.14", optional = true }
fs-hdfs = { version = "^0.1.3", optional = true }
uuid = { version = "^0.8", features = ["v4"] }

[dev-dependencies]
criterion = "0.3"
Expand Down
105 changes: 98 additions & 7 deletions datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use parquet::file::reader::Length;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics as ParquetStatistics;

use super::FileFormat;
use super::PhysicalPlanConfig;
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::datasource::{create_max_min_accs, get_col_stats};
Expand All @@ -49,6 +47,9 @@ use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{Accumulator, Statistics};
use crate::scalar::ScalarValue;

use super::FileFormat;
use super::PhysicalPlanConfig;

/// The default file exetension of parquet files
pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";

Expand Down Expand Up @@ -322,6 +323,12 @@ impl ChunkReader for ChunkObjectReader {

#[cfg(test)]
mod tests {
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use futures::StreamExt;

use crate::{
datasource::object_store::local::{
local_object_reader, local_object_reader_stream, local_unpartitioned_file,
Expand All @@ -331,11 +338,6 @@ mod tests {
};

use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use futures::StreamExt;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -611,4 +613,93 @@ mod tests {
.await?;
Ok(exec)
}

#[cfg(feature = "hdfs")]
mod test_hdfs {
use crate::datasource::object_store::hdfs::{
hadoop_object_reader, hadoop_object_reader_stream, hadoop_unpartitioned_file,
HadoopFileSystem,
};
use crate::test_util::hdfs::run_hdfs_test;

use super::*;

#[tokio::test]
async fn read_small_batches_from_hdfs() -> Result<()> {
run_hdfs_test("alltypes_plain.parquet".to_string(), |fs, filename_hdfs| {
Box::pin(async move {
let projection = None;
let exec = get_hdfs_exec(
Arc::new(fs),
filename_hdfs.as_str(),
&projection,
2,
None,
)
.await?;
let stream = exec.execute(0).await?;

let tt_batches = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
assert_eq!(2, batch.num_rows());
})
.fold(0, |acc, _| async move { acc + 1i32 })
.await;

assert_eq!(tt_batches, 4 /* 8/2 */);

// test metadata
assert_eq!(exec.statistics().num_rows, Some(8));
assert_eq!(exec.statistics().total_byte_size, Some(671));

Ok(())
})
})
.await
}

async fn get_hdfs_exec(
fs: Arc<HadoopFileSystem>,
file_name: &str,
projection: &Option<Vec<usize>>,
batch_size: usize,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let filename = file_name.to_string();
let format = ParquetFormat::default();
let file_schema = format
.infer_schema(hadoop_object_reader_stream(
fs.clone(),
vec![filename.clone()],
))
.await
.expect("Schema inference");
let statistics = format
.infer_stats(hadoop_object_reader(fs.clone(), filename.clone()))
.await
.expect("Stats inference");
let file_groups = vec![vec![hadoop_unpartitioned_file(
fs.clone(),
filename.clone(),
)]];
let exec = format
.create_physical_plan(
PhysicalPlanConfig {
object_store: fs.clone(),
file_schema,
file_groups,
statistics,
projection: projection.clone(),
batch_size,
limit,
table_partition_cols: vec![],
},
&[],
)
.await?;
Ok(exec)
}
}
}
Loading