Skip to content

Commit 50ddf43

Browse files
committed
Merge remote-tracking branch 'apache/master' into alamb/consolidate_parquet_tests
2 parents 00871b1 + 175adbd commit 50ddf43

File tree

32 files changed

+799
-422
lines changed

32 files changed

+799
-422
lines changed

.github/pull_request_template.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,16 @@ Closes #.
1919
There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
2020
-->
2121

22+
# Are these changes tested?
23+
24+
<!--
25+
We typically require tests for all PRs in order to:
26+
1. Prevent the code from being accidentally broken by subsequent changes
27+
2. Serve as another way to document the expected behavior of the code
28+
29+
If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
30+
-->
31+
2232
# Are there any user-facing changes?
2333

2434
<!--

.github/workflows/rust.yml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,3 +477,36 @@ jobs:
477477
# ignore ./Cargo.toml because putting workspaces in multi-line lists make it easy to read
478478
ci/scripts/rust_toml_fmt.sh
479479
git diff --exit-code
480+
481+
config-docs-check:
482+
name: check configs.md is up-to-date
483+
needs: [linux-build-lib]
484+
runs-on: ubuntu-latest
485+
container:
486+
image: amd64/rust
487+
env:
488+
# Disable full debug symbol generation to speed up CI build and keep memory down
489+
# "1" means line tables only, which is useful for panic tracebacks.
490+
RUSTFLAGS: "-C debuginfo=1"
491+
steps:
492+
- uses: actions/checkout@v3
493+
with:
494+
submodules: true
495+
- name: Cache Cargo
496+
uses: actions/cache@v3
497+
with:
498+
path: /github/home/.cargo
499+
# this key equals the ones on `linux-build-lib` for re-use
500+
key: cargo-cache-
501+
- name: Setup Rust toolchain
502+
uses: ./.github/actions/setup-builder
503+
with:
504+
rust-version: stable
505+
- uses: actions/setup-node@v3
506+
with:
507+
node-version: "14"
508+
- name: Check if configs.md has been modified
509+
run: |
510+
# If you encounter an error, run './dev/update_config_docs.sh' and commit
511+
./dev/update_config_docs.sh
512+
git diff --exit-code

benchmarks/src/bin/parquet_filter_pushdown.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
2121
use datafusion::optimizer::utils::disjunction;
2222
use datafusion::physical_plan::collect;
2323
use datafusion::prelude::{col, SessionConfig, SessionContext};
24+
use parquet::file::properties::WriterProperties;
2425
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
2526
use std::path::PathBuf;
2627
use std::time::Instant;
@@ -73,7 +74,19 @@ async fn main() -> Result<()> {
7374

7475
let path = opt.path.join("logs.parquet");
7576

76-
let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
77+
let mut props_builder = WriterProperties::builder();
78+
79+
if let Some(s) = opt.page_size {
80+
props_builder = props_builder
81+
.set_data_pagesize_limit(s)
82+
.set_write_batch_size(s);
83+
}
84+
85+
if let Some(s) = opt.row_group_size {
86+
props_builder = props_builder.set_max_row_group_size(s);
87+
}
88+
89+
let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;
7790

7891
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
7992

@@ -137,14 +150,9 @@ async fn run_benchmarks(
137150
println!("Using scan options {:?}", scan_options);
138151
for i in 0..iterations {
139152
let start = Instant::now();
140-
let rows = exec_scan(
141-
ctx,
142-
test_file,
143-
filter_expr.clone(),
144-
scan_options.clone(),
145-
debug,
146-
)
147-
.await?;
153+
let rows =
154+
exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
155+
.await?;
148156
println!(
149157
"Iteration {} returned {} rows in {} ms",
150158
i,
@@ -179,17 +187,11 @@ async fn exec_scan(
179187
fn gen_data(
180188
path: PathBuf,
181189
scale_factor: f32,
182-
page_size: Option<usize>,
183-
row_group_size: Option<usize>,
190+
props: WriterProperties,
184191
) -> Result<TestParquetFile> {
185192
let generator = AccessLogGenerator::new();
186193

187194
let num_batches = 100_f32 * scale_factor;
188195

189-
TestParquetFile::try_new(
190-
path,
191-
generator.take(num_batches as usize),
192-
page_size,
193-
row_group_size,
194-
)
196+
TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
195197
}

datafusion-cli/src/main.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
// under the License.
1717

1818
use clap::Parser;
19+
use datafusion::datasource::datasource::TableProviderFactory;
20+
use datafusion::datasource::file_format::file_type::FileType;
21+
use datafusion::datasource::listing_table_factory::ListingTableFactory;
1922
use datafusion::datasource::object_store::ObjectStoreRegistry;
2023
use datafusion::error::{DataFusionError, Result};
2124
use datafusion::execution::context::SessionConfig;
@@ -26,6 +29,7 @@ use datafusion_cli::{
2629
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
2730
};
2831
use mimalloc::MiMalloc;
32+
use std::collections::HashMap;
2933
use std::env;
3034
use std::path::Path;
3135
use std::sync::Arc;
@@ -93,7 +97,7 @@ pub async fn main() -> Result<()> {
9397

9498
if let Some(ref path) = args.data_path {
9599
let p = Path::new(path);
96-
env::set_current_dir(&p).unwrap();
100+
env::set_current_dir(p).unwrap();
97101
};
98102

99103
let mut session_config = SessionConfig::from_env().with_information_schema(true);
@@ -105,6 +109,7 @@ pub async fn main() -> Result<()> {
105109
let runtime_env = create_runtime_env()?;
106110
let mut ctx =
107111
SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
112+
ctx.refresh_catalogs().await?;
108113

109114
let mut print_options = PrintOptions {
110115
format: args.format,
@@ -142,11 +147,31 @@ pub async fn main() -> Result<()> {
142147
}
143148

144149
fn create_runtime_env() -> Result<RuntimeEnv> {
150+
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
151+
HashMap::new();
152+
table_factories.insert(
153+
"csv".to_string(),
154+
Arc::new(ListingTableFactory::new(FileType::CSV)),
155+
);
156+
table_factories.insert(
157+
"parquet".to_string(),
158+
Arc::new(ListingTableFactory::new(FileType::PARQUET)),
159+
);
160+
table_factories.insert(
161+
"avro".to_string(),
162+
Arc::new(ListingTableFactory::new(FileType::AVRO)),
163+
);
164+
table_factories.insert(
165+
"json".to_string(),
166+
Arc::new(ListingTableFactory::new(FileType::JSON)),
167+
);
168+
145169
let object_store_provider = DatafusionCliObjectStoreProvider {};
146170
let object_store_registry =
147171
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
148-
let rn_config =
149-
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
172+
let rn_config = RuntimeConfig::new()
173+
.with_object_store_registry(Arc::new(object_store_registry))
174+
.with_table_factories(table_factories);
150175
RuntimeEnv::new(rn_config)
151176
}
152177

datafusion-cli/src/object_storage.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,13 @@ mod tests {
141141
assert!(err.to_string().contains("Generic S3 error: Missing region"));
142142

143143
env::set_var("AWS_REGION", "us-east-1");
144-
assert!(provider.get_by_url(&Url::from_str(s3).unwrap()).is_ok());
144+
let url = Url::from_str(s3).expect("Unable to parse s3 url");
145+
let res = provider.get_by_url(&url);
146+
let msg = match res {
147+
Err(e) => format!("{}", e),
148+
Ok(_) => "".to_string()
149+
};
150+
assert_eq!("".to_string(), msg); // Fail with error message
145151
env::remove_var("AWS_REGION");
146152
}
147153
}

datafusion-cli/src/print_format.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,9 @@ mod tests {
107107
let batch = RecordBatch::try_new(
108108
schema,
109109
vec![
110-
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
111-
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
112-
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
110+
Arc::new(Int32Array::from_slice([1, 2, 3])),
111+
Arc::new(Int32Array::from_slice([4, 5, 6])),
112+
Arc::new(Int32Array::from_slice([7, 8, 9])),
113113
],
114114
)
115115
.unwrap();
@@ -137,9 +137,9 @@ mod tests {
137137
let batch = RecordBatch::try_new(
138138
schema,
139139
vec![
140-
Arc::new(Int32Array::from_slice(&[1, 2, 3])),
141-
Arc::new(Int32Array::from_slice(&[4, 5, 6])),
142-
Arc::new(Int32Array::from_slice(&[7, 8, 9])),
140+
Arc::new(Int32Array::from_slice([1, 2, 3])),
141+
Arc::new(Int32Array::from_slice([4, 5, 6])),
142+
Arc::new(Int32Array::from_slice([7, 8, 9])),
143143
],
144144
)
145145
.unwrap();

datafusion-examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ Run `git submodule update --init` to init test files.
4444

4545
## Distributed
4646

47-
- [`flight-client.rs`](examples/flight-client.rs) and [`flight-server.rs`](examples/flight-server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.
47+
- [`flight_client.rs`](examples/flight_client.rs) and [`flight_server.rs`](examples/flight_server.rs): Run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol.

datafusion/core/src/catalog/listing_schema.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
use crate::catalog::schema::SchemaProvider;
2020
use crate::datasource::datasource::TableProviderFactory;
2121
use crate::datasource::TableProvider;
22-
use datafusion_common::DataFusionError;
22+
use crate::execution::context::SessionState;
23+
use datafusion_common::{context, DataFusionError};
2324
use futures::TryStreamExt;
25+
use itertools::Itertools;
2426
use object_store::ObjectStore;
2527
use std::any::Any;
2628
use std::collections::{HashMap, HashSet};
@@ -72,7 +74,7 @@ impl ListingSchemaProvider {
7274
}
7375

7476
/// Reload table information from ObjectStore
75-
pub async fn refresh(&self) -> datafusion_common::Result<()> {
77+
pub async fn refresh(&self, state: &SessionState) -> datafusion_common::Result<()> {
7678
let entries: Vec<_> = self
7779
.store
7880
.list(Some(&self.path))
@@ -100,13 +102,20 @@ impl ListingSchemaProvider {
100102
.ok_or_else(|| {
101103
DataFusionError::Internal("Cannot parse file name!".to_string())
102104
})?;
103-
let table_name = table.to_str().ok_or_else(|| {
105+
let table_name = file_name.split('.').collect_vec()[0];
106+
let table_path = table.to_str().ok_or_else(|| {
104107
DataFusionError::Internal("Cannot parse file name!".to_string())
105108
})?;
106-
if !self.table_exist(file_name) {
107-
let table_name = format!("{}/{}", self.authority, table_name);
108-
let provider = self.factory.create(table_name.as_str()).await?;
109-
let _ = self.register_table(file_name.to_string(), provider.clone())?;
109+
if !self.table_exist(table_name) {
110+
let table_url = format!("{}/{}", self.authority, table_path);
111+
let provider = self
112+
.factory
113+
.create(state, table_url.as_str())
114+
.await
115+
.map_err(|e| {
116+
context!(format!("Could not create table for {}", table_url), e)
117+
})?;
118+
let _ = self.register_table(table_name.to_string(), provider.clone())?;
110119
}
111120
}
112121
Ok(())

datafusion/core/src/datasource/datasource.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,9 @@ pub trait TableProvider: Sync + Send {
9292
#[async_trait]
9393
pub trait TableProviderFactory: Sync + Send {
9494
/// Create a TableProvider with the given url
95-
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
95+
async fn create(
96+
&self,
97+
ctx: &SessionState,
98+
url: &str,
99+
) -> Result<Arc<dyn TableProvider>>;
96100
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Factory for creating ListingTables with default options
19+
20+
use crate::datasource::datasource::TableProviderFactory;
21+
use crate::datasource::file_format::avro::AvroFormat;
22+
use crate::datasource::file_format::csv::CsvFormat;
23+
use crate::datasource::file_format::file_type::{FileType, GetExt};
24+
use crate::datasource::file_format::json::JsonFormat;
25+
use crate::datasource::file_format::parquet::ParquetFormat;
26+
use crate::datasource::file_format::FileFormat;
27+
use crate::datasource::listing::{
28+
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
29+
};
30+
use crate::datasource::TableProvider;
31+
use crate::execution::context::SessionState;
32+
use async_trait::async_trait;
33+
use std::sync::Arc;
34+
35+
/// A `TableProviderFactory` capable of creating new `ListingTable`s
36+
pub struct ListingTableFactory {
37+
file_type: FileType,
38+
}
39+
40+
impl ListingTableFactory {
41+
/// Creates a new `ListingTableFactory`
42+
pub fn new(file_type: FileType) -> Self {
43+
Self { file_type }
44+
}
45+
}
46+
47+
#[async_trait]
48+
impl TableProviderFactory for ListingTableFactory {
49+
async fn create(
50+
&self,
51+
state: &SessionState,
52+
url: &str,
53+
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
54+
let file_extension = self.file_type.get_ext();
55+
56+
let file_format: Arc<dyn FileFormat> = match self.file_type {
57+
FileType::CSV => Arc::new(CsvFormat::default()),
58+
FileType::PARQUET => Arc::new(ParquetFormat::default()),
59+
FileType::AVRO => Arc::new(AvroFormat::default()),
60+
FileType::JSON => Arc::new(JsonFormat::default()),
61+
};
62+
63+
let options = ListingOptions {
64+
format: file_format,
65+
collect_stat: true,
66+
file_extension: file_extension.to_owned(),
67+
target_partitions: 1,
68+
table_partition_cols: vec![],
69+
};
70+
71+
let table_path = ListingTableUrl::parse(url)?;
72+
let resolved_schema = options.infer_schema(state, &table_path).await?;
73+
let config = ListingTableConfig::new(table_path)
74+
.with_listing_options(options)
75+
.with_schema(resolved_schema);
76+
let table = ListingTable::try_new(config)?;
77+
Ok(Arc::new(table))
78+
}
79+
}

0 commit comments

Comments
 (0)