Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 29, 2022
1 parent 43a28c9 commit bc14102
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 45 deletions.
97 changes: 87 additions & 10 deletions ballista-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "bc2668d7b3537417a26560255875ed82db2d5cc4" }
datafusion-cli = { git = "https://github.com/tustvold/arrow-datafusion", rev = "bc2668d7b3537417a26560255875ed82db2d5cc4" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }

datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "bc2668d7b3537417a26560255875ed82db2d5cc4" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "bc2668d7b3537417a26560255875ed82db2d5cc4" }
datafusion-proto = { git = "https://github.com/tustvold/arrow-datafusion", rev = "bc2668d7b3537417a26560255875ed82db2d5cc4" }
futures = "0.3"
hashbrown = "0.12"

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ message FileScanExecConf {
ScanLimit limit = 5;
Statistics statistics = 6;
repeated string table_partition_cols = 7;
string object_store_url = 8;
}

message ParquetScanExecNode {
Expand Down
31 changes: 16 additions & 15 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig};
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
Expand Down Expand Up @@ -219,6 +221,7 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(..) => Arc::new(AvroFormat::default()),
};

let url = ListingTableUrl::parse(&scan.path)?;
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
Expand All @@ -227,24 +230,20 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};

let object_store = ctx
.runtime_env()
.object_store(scan.path.as_str())
.map_err(|e| {
BallistaError::NotImplemented(format!(
"No object store is registered for path {}: {:?}",
scan.path, e
))
})?
.0;
let object_store = ctx.runtime_env().object_store(&url).map_err(|e| {
BallistaError::NotImplemented(format!(
"No object store is registered for path {}: {:?}",
scan.path, e
))
})?;

println!(
"Found object store {:?} for path {}",
object_store,
scan.path.as_str()
);

let config = ListingTableConfig::new(object_store, scan.path.as_str())
let config = ListingTableConfig::new(object_store, url)
.with_listing_options(options)
.with_schema(Arc::new(schema));

Expand Down Expand Up @@ -595,7 +594,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.options()
.table_partition_cols
.clone(),
path: listing_table.table_path().to_owned(),
path: listing_table.table_path().to_string(),
schema: Some(schema),
projection,
filters,
Expand Down Expand Up @@ -1073,6 +1072,7 @@ mod roundtrip_tests {
};
use std::io;
use std::sync::Arc;
use datafusion::datasource::listing::ListingTableUrl;

#[derive(Debug)]
struct TestObjectStore {}
Expand Down Expand Up @@ -1355,9 +1355,10 @@ mod roundtrip_tests {
ctx.runtime_env()
.register_object_store("test", custom_object_store.clone());

let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
let url = ListingTableUrl::parse("test://foo.csv").unwrap();
let os = ctx.runtime_env().object_store(&url)?;
assert_eq!("TestObjectStore", &format!("{:?}", os));
assert_eq!("foo.csv", uri);
assert_eq!("test://foo.csv", &url.to_string());

let schema = test_schema();
let plan = ctx
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;

Expand Down Expand Up @@ -381,6 +382,7 @@ impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf {

Ok(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
file_schema: schema,
file_groups: self
.file_groups
Expand Down
Loading

0 comments on commit bc14102

Please sign in to comment.