Skip to content

Commit a4dd1e2

Browse files
authored
Add csv loading benchmarks. (#13544)
* Add csv loading benchmarks. * Fix fmt. * Fix clippy.
1 parent 948949e commit a4dd1e2

File tree

6 files changed

+184
-7
lines changed

6 files changed

+184
-7
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ datafusion/sqllogictest/test_files/scratch*
6767
# temp file for core
6868
datafusion/core/*.parquet
6969

70+
# Generated core benchmark data
71+
datafusion/core/benches/data/*
72+
7073
# rat
7174
filtered_rat.txt
7275
rat.txt

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ nix = { version = "0.29.0", features = ["fs"] }
159159
harness = false
160160
name = "aggregate_query_sql"
161161

162+
[[bench]]
163+
harness = false
164+
name = "csv_load"
165+
162166
[[bench]]
163167
harness = false
164168
name = "distinct_query_sql"
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#[macro_use]
19+
extern crate criterion;
20+
extern crate arrow;
21+
extern crate datafusion;
22+
23+
mod data_utils;
24+
use crate::criterion::Criterion;
25+
use datafusion::error::Result;
26+
use datafusion::execution::context::SessionContext;
27+
use datafusion::prelude::CsvReadOptions;
28+
use datafusion::test_util::csv::TestCsvFile;
29+
use parking_lot::Mutex;
30+
use std::sync::Arc;
31+
use std::time::Duration;
32+
use test_utils::AccessLogGenerator;
33+
use tokio::runtime::Runtime;
34+
35+
fn load_csv(ctx: Arc<Mutex<SessionContext>>, path: &str, options: CsvReadOptions) {
36+
let rt = Runtime::new().unwrap();
37+
let df = rt.block_on(ctx.lock().read_csv(path, options)).unwrap();
38+
criterion::black_box(rt.block_on(df.collect()).unwrap());
39+
}
40+
41+
fn create_context() -> Result<Arc<Mutex<SessionContext>>> {
42+
let ctx = SessionContext::new();
43+
Ok(Arc::new(Mutex::new(ctx)))
44+
}
45+
46+
fn generate_test_file() -> TestCsvFile {
47+
let write_location = std::env::current_dir()
48+
.unwrap()
49+
.join("benches")
50+
.join("data");
51+
52+
// Make sure the write directory exists.
53+
std::fs::create_dir_all(&write_location).unwrap();
54+
let file_path = write_location.join("logs.csv");
55+
56+
let generator = AccessLogGenerator::new().with_include_nulls(true);
57+
let num_batches = 2;
58+
TestCsvFile::try_new(file_path.clone(), generator.take(num_batches as usize))
59+
.expect("Failed to create test file.")
60+
}
61+
62+
fn criterion_benchmark(c: &mut Criterion) {
63+
let ctx = create_context().unwrap();
64+
let test_file = generate_test_file();
65+
66+
let mut group = c.benchmark_group("load csv testing");
67+
group.measurement_time(Duration::from_secs(20));
68+
69+
group.bench_function("default csv read options", |b| {
70+
b.iter(|| {
71+
load_csv(
72+
ctx.clone(),
73+
test_file.path().to_str().unwrap(),
74+
CsvReadOptions::default(),
75+
)
76+
})
77+
});
78+
}
79+
80+
criterion_group!(benches, criterion_benchmark);
81+
criterion_main!(benches);
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Helpers for writing csv files and reading them back
19+
20+
use std::fs::File;
21+
use std::path::PathBuf;
22+
use std::sync::Arc;
23+
24+
use crate::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
25+
use crate::error::Result;
26+
27+
use arrow::csv::WriterBuilder;
28+
29+
/// a CSV file that has been created for testing.
30+
pub struct TestCsvFile {
31+
path: PathBuf,
32+
schema: SchemaRef,
33+
}
34+
35+
impl TestCsvFile {
36+
/// Creates a new csv file at the specified location
37+
pub fn try_new(
38+
path: PathBuf,
39+
batches: impl IntoIterator<Item = RecordBatch>,
40+
) -> Result<Self> {
41+
let file = File::create(&path).unwrap();
42+
let builder = WriterBuilder::new().with_header(true);
43+
let mut writer = builder.build(file);
44+
45+
let mut batches = batches.into_iter();
46+
let first_batch = batches.next().expect("need at least one record batch");
47+
let schema = first_batch.schema();
48+
49+
let mut num_rows = 0;
50+
for batch in batches {
51+
writer.write(&batch)?;
52+
num_rows += batch.num_rows();
53+
}
54+
55+
println!("Generated test dataset with {num_rows} rows");
56+
57+
Ok(Self { path, schema })
58+
}
59+
60+
/// The schema of this csv file
61+
pub fn schema(&self) -> SchemaRef {
62+
Arc::clone(&self.schema)
63+
}
64+
65+
/// The path to the csv file
66+
pub fn path(&self) -> &std::path::Path {
67+
self.path.as_path()
68+
}
69+
}

datafusion/core/src/test_util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#[cfg(feature = "parquet")]
2121
pub mod parquet;
2222

23+
pub mod csv;
24+
2325
use std::any::Any;
2426
use std::collections::HashMap;
2527
use std::fs::File;

test-utils/src/data_gen.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct GeneratorOptions {
3333
pods_per_host: Range<usize>,
3434
containers_per_pod: Range<usize>,
3535
entries_per_container: Range<usize>,
36+
include_nulls: bool,
3637
}
3738

3839
impl Default for GeneratorOptions {
@@ -42,6 +43,7 @@ impl Default for GeneratorOptions {
4243
pods_per_host: 1..15,
4344
containers_per_pod: 1..3,
4445
entries_per_container: 1024..8192,
46+
include_nulls: false,
4547
}
4648
}
4749
}
@@ -149,13 +151,23 @@ impl BatchBuilder {
149151
self.image.append(image).unwrap();
150152
self.time.append_value(time);
151153

152-
self.client_addr.append_value(format!(
153-
"{}.{}.{}.{}",
154-
rng.gen::<u8>(),
155-
rng.gen::<u8>(),
156-
rng.gen::<u8>(),
157-
rng.gen::<u8>()
158-
));
154+
if self.options.include_nulls {
155+
// Append a null value if the option is set
156+
// Use both "NULL" as a string and a null value
157+
if rng.gen_bool(0.5) {
158+
self.client_addr.append_null();
159+
} else {
160+
self.client_addr.append_value("NULL");
161+
}
162+
} else {
163+
self.client_addr.append_value(format!(
164+
"{}.{}.{}.{}",
165+
rng.gen::<u8>(),
166+
rng.gen::<u8>(),
167+
rng.gen::<u8>(),
168+
rng.gen::<u8>()
169+
));
170+
}
159171
self.request_duration.append_value(rng.gen());
160172
self.request_user_agent
161173
.append_value(random_string(rng, 20..100));
@@ -317,6 +329,12 @@ impl AccessLogGenerator {
317329
self.options.entries_per_container = range;
318330
self
319331
}
332+
333+
// Set the condition for null values in the generated data
334+
pub fn with_include_nulls(mut self, include_nulls: bool) -> Self {
335+
self.options.include_nulls = include_nulls;
336+
self
337+
}
320338
}
321339

322340
impl Iterator for AccessLogGenerator {

0 commit comments

Comments
 (0)