Skip to content

Commit 0821202

Browse files
committed
Extend source to enable read from remote storage
1 parent 0125451 commit 0821202

File tree

20 files changed

+1001
-507
lines changed

20 files changed

+1001
-507
lines changed

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
159159
LogicalPlanBuilder::scan_parquet_with_name(
160160
&scan.path,
161161
projection,
162-
24,
162+
create_datafusion_context_concurrency(24),
163163
&scan.table_name,
164164
)? //TODO concurrency
165165
.build()
@@ -1100,6 +1100,8 @@ impl TryInto<Field> for &protobuf::Field {
11001100
}
11011101
}
11021102

1103+
use crate::utils::create_datafusion_context_concurrency;
1104+
use datafusion::physical_plan::datetime_expressions::to_timestamp;
11031105
use datafusion::physical_plan::{aggregates, windows};
11041106
use datafusion::prelude::{
11051107
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,

ballista/rust/core/src/serde/physical_plan/from_proto.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
2929
use crate::serde::protobuf::ShuffleReaderPartition;
3030
use crate::serde::scheduler::PartitionLocation;
3131
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
32+
use crate::utils::create_datafusion_context_concurrency;
3233
use crate::{convert_box_required, convert_required, into_required};
3334
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
3435
use datafusion::catalog::catalog::{
3536
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
3637
};
38+
use datafusion::datasource::object_store::ObjectStoreRegistry;
3739
use datafusion::execution::context::{
3840
ExecutionConfig, ExecutionContextState, ExecutionProps,
3941
};
@@ -129,14 +131,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
129131
}
130132
PhysicalPlanType::ParquetScan(scan) => {
131133
let projection = scan.projection.iter().map(|i| *i as usize).collect();
132-
let filenames: Vec<&str> =
133-
scan.filename.iter().map(|s| s.as_str()).collect();
134-
Ok(Arc::new(ParquetExec::try_from_files(
135-
&filenames,
134+
let path: &str = scan.filename[0].as_str();
135+
Ok(Arc::new(ParquetExec::try_from_path(
136+
path,
136137
Some(projection),
137138
None,
138139
scan.batch_size as usize,
139-
scan.num_partitions as usize,
140+
create_datafusion_context_concurrency(scan.num_partitions as usize),
140141
None,
141142
)?))
142143
}
@@ -614,13 +615,17 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {
614615

615616
let catalog_list =
616617
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;
618+
619+
let object_store_registry = Arc::new(ObjectStoreRegistry::new());
620+
617621
let ctx_state = ExecutionContextState {
618622
catalog_list,
619623
scalar_functions: Default::default(),
620624
var_provider: Default::default(),
621625
aggregate_functions: Default::default(),
622626
config: ExecutionConfig::new(),
623627
execution_props: ExecutionProps::new(),
628+
object_store_registry,
624629
};
625630

626631
let fun_expr = functions::create_physical_fun(

ballista/rust/core/src/serde/physical_plan/to_proto.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
259259
let filenames = exec
260260
.partitions()
261261
.iter()
262-
.flat_map(|part| part.filenames().to_owned())
262+
.flat_map(|part| part.filenames())
263263
.collect();
264264
Ok(protobuf::PhysicalPlanNode {
265265
physical_plan_type: Some(PhysicalPlanType::ParquetScan(

ballista/rust/core/src/utils.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,11 @@ pub fn create_datafusion_context(
252252
ExecutionContext::with_config(config)
253253
}
254254

255+
/// Create a DataFusion context that is compatible with Ballista in concurrency
256+
pub fn create_datafusion_context_concurrency(concurrency: usize) -> ExecutionContext {
257+
ExecutionContext::with_concurrency(concurrency)
258+
}
259+
255260
pub struct BallistaQueryPlanner {
256261
scheduler_url: String,
257262
config: BallistaConfig,

ballista/rust/scheduler/src/lib.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ use self::state::{ConfigBackendClient, SchedulerState};
8585
use ballista_core::config::BallistaConfig;
8686
use ballista_core::execution_plans::ShuffleWriterExec;
8787
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
88-
use datafusion::physical_plan::parquet::ParquetExec;
88+
use ballista_core::utils::create_datafusion_context_concurrency;
89+
use datafusion::datasource::parquet::ParquetRootDesc;
8990
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
9091
use std::time::{Instant, SystemTime, UNIX_EPOCH};
9192

@@ -285,24 +286,19 @@ impl SchedulerGrpc for SchedulerServer {
285286

286287
match file_type {
287288
FileType::Parquet => {
288-
let parquet_exec =
289-
ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
290-
.map_err(|e| {
291-
let msg = format!("Error opening parquet files: {}", e);
292-
error!("{}", msg);
293-
tonic::Status::internal(msg)
294-
})?;
289+
let ctx = create_datafusion_context_concurrency(1);
290+
let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| {
291+
let msg = format!("Error opening parquet files: {}", e);
292+
error!("{}", msg);
293+
tonic::Status::internal(msg)
294+
})?;
295295

296296
//TODO include statistics and any other info needed to reconstruct ParquetExec
297297
Ok(Response::new(GetFileMetadataResult {
298-
schema: Some(parquet_exec.schema().as_ref().into()),
299-
partitions: parquet_exec
300-
.partitions()
301-
.iter()
302-
.map(|part| FilePartitionMetadata {
303-
filename: part.filenames().to_vec(),
304-
})
305-
.collect(),
298+
schema: Some(parquet_desc.schema().as_ref().into()),
299+
partitions: vec![FilePartitionMetadata {
300+
filename: vec![path],
301+
}],
306302
}))
307303
}
308304
//TODO implement for CSV

benchmarks/src/bin/tpch.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,10 @@ fn get_table(
475475
}
476476
"parquet" => {
477477
let path = format!("{}/{}", path, table);
478-
Ok(Arc::new(ParquetTable::try_new(&path, max_concurrency)?))
478+
Ok(Arc::new(ParquetTable::try_new(
479+
&path,
480+
ExecutionContext::with_concurrency(max_concurrency),
481+
)?))
479482
}
480483
other => {
481484
unimplemented!("Invalid file format '{}'", other);

datafusion-examples/examples/flight_server.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ impl FlightService for FlightServiceImpl {
6565
) -> Result<Response<SchemaResult>, Status> {
6666
let request = request.into_inner();
6767

68-
let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
68+
let table = ParquetTable::try_new(
69+
&request.path[0],
70+
ExecutionContext::with_concurrency(num_cpus::get()),
71+
)
72+
.unwrap();
6973

7074
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
7175
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();

datafusion/src/datasource/csv.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ use std::string::String;
4040
use std::sync::{Arc, Mutex};
4141

4242
use crate::datasource::datasource::Statistics;
43+
use crate::datasource::local::LocalFileSystem;
44+
use crate::datasource::object_store::ObjectStore;
4345
use crate::datasource::{Source, TableProvider};
4446
use crate::error::{DataFusionError, Result};
4547
use crate::logical_plan::Expr;
4648
use crate::physical_plan::csv::CsvExec;
4749
pub use crate::physical_plan::csv::CsvReadOptions;
48-
use crate::physical_plan::{common, ExecutionPlan};
50+
use crate::physical_plan::ExecutionPlan;
4951

5052
/// Represents a CSV file with a provided schema
5153
pub struct CsvFile {
@@ -64,7 +66,8 @@ impl CsvFile {
6466
let schema = Arc::new(match options.schema {
6567
Some(s) => s.clone(),
6668
None => {
67-
let filenames = common::build_file_list(&path, options.file_extension)?;
69+
let filenames = LocalFileSystem
70+
.list_all_files(path.as_str(), options.file_extension)?;
6871
if filenames.is_empty() {
6972
return Err(DataFusionError::Plan(format!(
7073
"No files found at {path} with file extension {file_extension}",

datafusion/src/datasource/json.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ use crate::{
3030
datasource::{Source, TableProvider},
3131
error::{DataFusionError, Result},
3232
physical_plan::{
33-
common,
3433
json::{NdJsonExec, NdJsonReadOptions},
3534
ExecutionPlan,
3635
},
3736
};
3837
use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};
3938

4039
use super::datasource::Statistics;
40+
use crate::datasource::local::LocalFileSystem;
41+
use crate::datasource::object_store::ObjectStore;
4142

4243
trait SeekRead: Read + Seek {}
4344

@@ -57,7 +58,8 @@ impl NdJsonFile {
5758
let schema = if let Some(schema) = options.schema {
5859
schema
5960
} else {
60-
let filenames = common::build_file_list(path, options.file_extension)?;
61+
let filenames =
62+
LocalFileSystem.list_all_files(path, options.file_extension)?;
6163
if filenames.is_empty() {
6264
return Err(DataFusionError::Plan(format!(
6365
"No files found at {path} with file extension {file_extension}",

datafusion/src/datasource/local.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Object store that represents the Local File System.
19+
20+
use crate::datasource::object_store::{ObjectReader, ObjectStore};
21+
use crate::error::DataFusionError;
22+
use crate::error::Result;
23+
use crate::parquet::file::reader::{ChunkReader, Length};
24+
use std::any::Any;
25+
use std::fs;
26+
use std::fs::{metadata, File};
27+
use std::io::Read;
28+
use std::sync::Arc;
29+
30+
#[derive(Debug)]
31+
/// Local File System as Object Store.
32+
pub struct LocalFileSystem;
33+
34+
impl ObjectStore for LocalFileSystem {
35+
fn as_any(&self) -> &dyn Any {
36+
self
37+
}
38+
39+
fn list_all_files(&self, path: &str, ext: &str) -> Result<Vec<String>> {
40+
list_all(path, ext)
41+
}
42+
43+
fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>> {
44+
let file = File::open(file_path)?;
45+
let reader = LocalFSObjectReader::new(file)?;
46+
Ok(Arc::new(reader))
47+
}
48+
}
49+
50+
struct LocalFSObjectReader {
51+
file: File,
52+
}
53+
54+
impl LocalFSObjectReader {
55+
fn new(file: File) -> Result<Self> {
56+
Ok(Self { file })
57+
}
58+
}
59+
60+
impl ObjectReader for LocalFSObjectReader {
61+
fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read> {
62+
Box::new(FileSegmentReader::new(
63+
self.file.try_clone().unwrap(),
64+
start,
65+
length,
66+
))
67+
}
68+
69+
fn length(&self) -> u64 {
70+
self.file.len()
71+
}
72+
}
73+
74+
struct FileSegmentReader {
75+
file: File,
76+
start: u64,
77+
length: usize,
78+
}
79+
80+
impl FileSegmentReader {
81+
fn new(file: File, start: u64, length: usize) -> Self {
82+
Self {
83+
file,
84+
start,
85+
length,
86+
}
87+
}
88+
}
89+
90+
impl Read for FileSegmentReader {
91+
fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
92+
let mut file_source = self.file.get_read(self.start, self.length)?;
93+
file_source.read(buf)
94+
}
95+
}
96+
97+
fn list_all(root_path: &str, ext: &str) -> Result<Vec<String>> {
98+
let mut filenames: Vec<String> = Vec::new();
99+
list_all_files(root_path, &mut filenames, ext)?;
100+
Ok(filenames)
101+
}
102+
103+
/// Recursively build a list of files in a directory with a given extension with an accumulator list
104+
fn list_all_files(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
105+
let metadata = metadata(dir)?;
106+
if metadata.is_file() {
107+
if dir.ends_with(ext) {
108+
filenames.push(dir.to_string());
109+
}
110+
} else {
111+
for entry in fs::read_dir(dir)? {
112+
let entry = entry?;
113+
let path = entry.path();
114+
if let Some(path_name) = path.to_str() {
115+
if path.is_dir() {
116+
list_all_files(path_name, filenames, ext)?;
117+
} else if path_name.ends_with(ext) {
118+
filenames.push(path_name.to_string());
119+
}
120+
} else {
121+
return Err(DataFusionError::Plan("Invalid path".to_string()));
122+
}
123+
}
124+
}
125+
Ok(())
126+
}

0 commit comments

Comments
 (0)