Skip to content

Commit 09004c5

Browse files
committed
refactor: make ParquetSink tests a bit more readable
1 parent 75202b5 commit 09004c5

File tree

1 file changed

+86
-67
lines changed
  • datafusion/core/src/datasource/file_format

1 file changed

+86
-67
lines changed

datafusion/core/src/datasource/file_format/parquet.rs

Lines changed: 86 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -2335,42 +2335,46 @@ mod tests {
23352335
async fn parquet_sink_write() -> Result<()> {
23362336
let parquet_sink = create_written_parquet_sink("file:///").await?;
23372337

2338-
// assert written
2339-
let mut written = parquet_sink.written();
2340-
let written = written.drain();
2341-
assert_eq!(
2342-
written.len(),
2343-
1,
2344-
"expected a single parquet files to be written, instead found {}",
2345-
written.len()
2346-
);
2338+
// assert written to proper path
2339+
let (path, file_metadata) = get_written(parquet_sink)?;
2340+
let path_parts = path.parts().collect::<Vec<_>>();
2341+
assert_eq!(path_parts.len(), 1, "should not have path prefix");
23472342

23482343
// check the file metadata
2349-
let (
2350-
path,
2351-
FileMetaData {
2352-
num_rows,
2353-
schema,
2354-
key_value_metadata,
2355-
..
2344+
let expected_kv_meta = vec![
2345+
KeyValue {
2346+
key: "my-data".to_string(),
2347+
value: Some("stuff".to_string()),
2348+
},
2349+
KeyValue {
2350+
key: "my-data-bool-key".to_string(),
2351+
value: None,
23562352
},
2357-
) = written.take(1).next().unwrap();
2353+
];
2354+
assert_file_metadata(file_metadata, expected_kv_meta);
2355+
2356+
Ok(())
2357+
}
2358+
2359+
#[tokio::test]
2360+
async fn parquet_sink_parallel_write() -> Result<()> {
2361+
let opts = ParquetOptions {
2362+
allow_single_file_parallelism: true,
2363+
maximum_parallel_row_group_writers: 2,
2364+
maximum_buffered_record_batches_per_stream: 2,
2365+
..Default::default()
2366+
};
2367+
2368+
let parquet_sink =
2369+
create_written_parquet_sink_using_config("file:///", opts).await?;
2370+
2371+
// assert written to proper path
2372+
let (path, file_metadata) = get_written(parquet_sink)?;
23582373
let path_parts = path.parts().collect::<Vec<_>>();
23592374
assert_eq!(path_parts.len(), 1, "should not have path prefix");
23602375

2361-
assert_eq!(num_rows, 2, "file metadata to have 2 rows");
2362-
assert!(
2363-
schema.iter().any(|col_schema| col_schema.name == "a"),
2364-
"output file metadata should contain col a"
2365-
);
2366-
assert!(
2367-
schema.iter().any(|col_schema| col_schema.name == "b"),
2368-
"output file metadata should contain col b"
2369-
);
2370-
2371-
let mut key_value_metadata = key_value_metadata.unwrap();
2372-
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
2373-
let expected_metadata = vec![
2376+
// check the file metadata
2377+
let expected_kv_meta = vec![
23742378
KeyValue {
23752379
key: "my-data".to_string(),
23762380
value: Some("stuff".to_string()),
@@ -2380,7 +2384,7 @@ mod tests {
23802384
value: None,
23812385
},
23822386
];
2383-
assert_eq!(key_value_metadata, expected_metadata);
2387+
assert_file_metadata(file_metadata, expected_kv_meta);
23842388

23852389
Ok(())
23862390
}
@@ -2391,18 +2395,8 @@ mod tests {
23912395
let file_path = format!("file:///path/to/{}", filename);
23922396
let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?;
23932397

2394-
// assert written
2395-
let mut written = parquet_sink.written();
2396-
let written = written.drain();
2397-
assert_eq!(
2398-
written.len(),
2399-
1,
2400-
"expected a single parquet file to be written, instead found {}",
2401-
written.len()
2402-
);
2403-
2404-
let (path, ..) = written.take(1).next().unwrap();
2405-
2398+
// assert written to proper path
2399+
let (path, _) = get_written(parquet_sink)?;
24062400
let path_parts = path.parts().collect::<Vec<_>>();
24072401
assert_eq!(
24082402
path_parts.len(),
@@ -2420,18 +2414,8 @@ mod tests {
24202414
let file_path = "file:///path/to";
24212415
let parquet_sink = create_written_parquet_sink(file_path).await?;
24222416

2423-
// assert written
2424-
let mut written = parquet_sink.written();
2425-
let written = written.drain();
2426-
assert_eq!(
2427-
written.len(),
2428-
1,
2429-
"expected a single parquet file to be written, instead found {}",
2430-
written.len()
2431-
);
2432-
2433-
let (path, ..) = written.take(1).next().unwrap();
2434-
2417+
// assert written to proper path
2418+
let (path, _) = get_written(parquet_sink)?;
24352419
let path_parts = path.parts().collect::<Vec<_>>();
24362420
assert_eq!(
24372421
path_parts.len(),
@@ -2449,18 +2433,8 @@ mod tests {
24492433
let file_path = "file:///path/to/";
24502434
let parquet_sink = create_written_parquet_sink(file_path).await?;
24512435

2452-
// assert written
2453-
let mut written = parquet_sink.written();
2454-
let written = written.drain();
2455-
assert_eq!(
2456-
written.len(),
2457-
1,
2458-
"expected a single parquet file to be written, instead found {}",
2459-
written.len()
2460-
);
2461-
2462-
let (path, ..) = written.take(1).next().unwrap();
2463-
2436+
// assert written to proper path
2437+
let (path, _) = get_written(parquet_sink)?;
24642438
let path_parts = path.parts().collect::<Vec<_>>();
24652439
assert_eq!(
24662440
path_parts.len(),
@@ -2474,6 +2448,14 @@ mod tests {
24742448
}
24752449

24762450
async fn create_written_parquet_sink(table_path: &str) -> Result<Arc<ParquetSink>> {
2451+
create_written_parquet_sink_using_config(table_path, ParquetOptions::default())
2452+
.await
2453+
}
2454+
2455+
async fn create_written_parquet_sink_using_config(
2456+
table_path: &str,
2457+
global: ParquetOptions,
2458+
) -> Result<Arc<ParquetSink>> {
24772459
let field_a = Field::new("a", DataType::Utf8, false);
24782460
let field_b = Field::new("b", DataType::Utf8, false);
24792461
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
@@ -2495,6 +2477,7 @@ mod tests {
24952477
("my-data".to_string(), Some("stuff".to_string())),
24962478
("my-data-bool-key".to_string(), None),
24972479
]),
2480+
global,
24982481
..Default::default()
24992482
},
25002483
));
@@ -2519,6 +2502,42 @@ mod tests {
25192502
Ok(parquet_sink)
25202503
}
25212504

2505+
fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path, FileMetaData)> {
2506+
let mut written = parquet_sink.written();
2507+
let written = written.drain();
2508+
assert_eq!(
2509+
written.len(),
2510+
1,
2511+
"expected a single parquet files to be written, instead found {}",
2512+
written.len()
2513+
);
2514+
2515+
let (path, file_metadata) = written.take(1).next().unwrap();
2516+
Ok((path, file_metadata))
2517+
}
2518+
2519+
fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: Vec<KeyValue>) {
2520+
let FileMetaData {
2521+
num_rows,
2522+
schema,
2523+
key_value_metadata,
2524+
..
2525+
} = file_metadata;
2526+
assert_eq!(num_rows, 2, "file metadata to have 2 rows");
2527+
assert!(
2528+
schema.iter().any(|col_schema| col_schema.name == "a"),
2529+
"output file metadata should contain col a"
2530+
);
2531+
assert!(
2532+
schema.iter().any(|col_schema| col_schema.name == "b"),
2533+
"output file metadata should contain col b"
2534+
);
2535+
2536+
let mut key_value_metadata = key_value_metadata.unwrap();
2537+
key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
2538+
assert_eq!(key_value_metadata, expected_kv);
2539+
}
2540+
25222541
#[tokio::test]
25232542
async fn parquet_sink_write_partitions() -> Result<()> {
25242543
let field_a = Field::new("a", DataType::Utf8, false);

0 commit comments

Comments
 (0)