Skip to content

Commit d6ddcb2

Browse files
committed
Add partitioned_csv setup code to sql_integration test
1 parent 15cfcbc commit d6ddcb2

File tree

5 files changed

+347
-239
lines changed

5 files changed

+347
-239
lines changed

datafusion/src/execution/context.rs

Lines changed: 1 addition & 238 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,11 +1281,9 @@ mod tests {
12811281
use super::*;
12821282
use crate::execution::context::QueryPlanner;
12831283
use crate::from_slice::FromSlice;
1284-
use crate::logical_plan::plan::Projection;
1285-
use crate::logical_plan::TableScan;
12861284
use crate::logical_plan::{binary_expr, lit, Operator};
1285+
use crate::physical_plan::collect;
12871286
use crate::physical_plan::functions::{make_scalar_function, Volatility};
1288-
use crate::physical_plan::{collect, collect_partitioned};
12891287
use crate::test;
12901288
use crate::variable::VarType;
12911289
use crate::{
@@ -1311,7 +1309,6 @@ mod tests {
13111309
use std::thread::{self, JoinHandle};
13121310
use std::{io::prelude::*, sync::Mutex};
13131311
use tempfile::TempDir;
1314-
use test::*;
13151312

13161313
#[tokio::test]
13171314
async fn shared_memory_and_disk_manager() {
@@ -1347,62 +1344,6 @@ mod tests {
13471344
));
13481345
}
13491346

1350-
#[tokio::test]
1351-
async fn parallel_projection() -> Result<()> {
1352-
let partition_count = 4;
1353-
let results = execute("SELECT c1, c2 FROM test", partition_count).await?;
1354-
1355-
let expected = vec![
1356-
"+----+----+",
1357-
"| c1 | c2 |",
1358-
"+----+----+",
1359-
"| 3 | 1 |",
1360-
"| 3 | 2 |",
1361-
"| 3 | 3 |",
1362-
"| 3 | 4 |",
1363-
"| 3 | 5 |",
1364-
"| 3 | 6 |",
1365-
"| 3 | 7 |",
1366-
"| 3 | 8 |",
1367-
"| 3 | 9 |",
1368-
"| 3 | 10 |",
1369-
"| 2 | 1 |",
1370-
"| 2 | 2 |",
1371-
"| 2 | 3 |",
1372-
"| 2 | 4 |",
1373-
"| 2 | 5 |",
1374-
"| 2 | 6 |",
1375-
"| 2 | 7 |",
1376-
"| 2 | 8 |",
1377-
"| 2 | 9 |",
1378-
"| 2 | 10 |",
1379-
"| 1 | 1 |",
1380-
"| 1 | 2 |",
1381-
"| 1 | 3 |",
1382-
"| 1 | 4 |",
1383-
"| 1 | 5 |",
1384-
"| 1 | 6 |",
1385-
"| 1 | 7 |",
1386-
"| 1 | 8 |",
1387-
"| 1 | 9 |",
1388-
"| 1 | 10 |",
1389-
"| 0 | 1 |",
1390-
"| 0 | 2 |",
1391-
"| 0 | 3 |",
1392-
"| 0 | 4 |",
1393-
"| 0 | 5 |",
1394-
"| 0 | 6 |",
1395-
"| 0 | 7 |",
1396-
"| 0 | 8 |",
1397-
"| 0 | 9 |",
1398-
"| 0 | 10 |",
1399-
"+----+----+",
1400-
];
1401-
assert_batches_sorted_eq!(expected, &results);
1402-
1403-
Ok(())
1404-
}
1405-
14061347
#[tokio::test]
14071348
async fn create_variable_expr() -> Result<()> {
14081349
let tmp_dir = TempDir::new()?;
@@ -1447,184 +1388,6 @@ mod tests {
14471388
Ok(())
14481389
}
14491390

1450-
#[tokio::test]
1451-
async fn parallel_query_with_filter() -> Result<()> {
1452-
let tmp_dir = TempDir::new()?;
1453-
let partition_count = 4;
1454-
let ctx = create_ctx(&tmp_dir, partition_count).await?;
1455-
1456-
let logical_plan =
1457-
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
1458-
let logical_plan = ctx.optimize(&logical_plan)?;
1459-
1460-
let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
1461-
1462-
let runtime = ctx.state.lock().runtime_env.clone();
1463-
let results = collect_partitioned(physical_plan, runtime).await?;
1464-
1465-
// note that the order of partitions is not deterministic
1466-
let mut num_rows = 0;
1467-
for partition in &results {
1468-
for batch in partition {
1469-
num_rows += batch.num_rows();
1470-
}
1471-
}
1472-
assert_eq!(20, num_rows);
1473-
1474-
let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
1475-
let expected = vec![
1476-
"+----+----+",
1477-
"| c1 | c2 |",
1478-
"+----+----+",
1479-
"| 1 | 1 |",
1480-
"| 1 | 10 |",
1481-
"| 1 | 2 |",
1482-
"| 1 | 3 |",
1483-
"| 1 | 4 |",
1484-
"| 1 | 5 |",
1485-
"| 1 | 6 |",
1486-
"| 1 | 7 |",
1487-
"| 1 | 8 |",
1488-
"| 1 | 9 |",
1489-
"| 2 | 1 |",
1490-
"| 2 | 10 |",
1491-
"| 2 | 2 |",
1492-
"| 2 | 3 |",
1493-
"| 2 | 4 |",
1494-
"| 2 | 5 |",
1495-
"| 2 | 6 |",
1496-
"| 2 | 7 |",
1497-
"| 2 | 8 |",
1498-
"| 2 | 9 |",
1499-
"+----+----+",
1500-
];
1501-
assert_batches_sorted_eq!(expected, &results);
1502-
1503-
Ok(())
1504-
}
1505-
1506-
#[tokio::test]
1507-
async fn projection_on_table_scan() -> Result<()> {
1508-
let tmp_dir = TempDir::new()?;
1509-
let partition_count = 4;
1510-
let ctx = create_ctx(&tmp_dir, partition_count).await?;
1511-
let runtime = ctx.state.lock().runtime_env.clone();
1512-
1513-
let table = ctx.table("test")?;
1514-
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
1515-
.project(vec![col("c2")])?
1516-
.build()?;
1517-
1518-
let optimized_plan = ctx.optimize(&logical_plan)?;
1519-
match &optimized_plan {
1520-
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
1521-
LogicalPlan::TableScan(TableScan {
1522-
source,
1523-
projected_schema,
1524-
..
1525-
}) => {
1526-
assert_eq!(source.schema().fields().len(), 3);
1527-
assert_eq!(projected_schema.fields().len(), 1);
1528-
}
1529-
_ => panic!("input to projection should be TableScan"),
1530-
},
1531-
_ => panic!("expect optimized_plan to be projection"),
1532-
}
1533-
1534-
let expected = "Projection: #test.c2\
1535-
\n TableScan: test projection=Some([1])";
1536-
assert_eq!(format!("{:?}", optimized_plan), expected);
1537-
1538-
let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
1539-
1540-
assert_eq!(1, physical_plan.schema().fields().len());
1541-
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
1542-
1543-
let batches = collect(physical_plan, runtime).await?;
1544-
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());
1545-
1546-
Ok(())
1547-
}
1548-
1549-
#[tokio::test]
1550-
async fn preserve_nullability_on_projection() -> Result<()> {
1551-
let tmp_dir = TempDir::new()?;
1552-
let ctx = create_ctx(&tmp_dir, 1).await?;
1553-
1554-
let schema: Schema = ctx.table("test").unwrap().schema().clone().into();
1555-
assert!(!schema.field_with_name("c1")?.is_nullable());
1556-
1557-
let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
1558-
.project(vec![col("c1")])?
1559-
.build()?;
1560-
1561-
let plan = ctx.optimize(&plan)?;
1562-
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
1563-
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
1564-
Ok(())
1565-
}
1566-
1567-
#[tokio::test]
1568-
async fn projection_on_memory_scan() -> Result<()> {
1569-
let schema = Schema::new(vec![
1570-
Field::new("a", DataType::Int32, false),
1571-
Field::new("b", DataType::Int32, false),
1572-
Field::new("c", DataType::Int32, false),
1573-
]);
1574-
let schema = SchemaRef::new(schema);
1575-
1576-
let partitions = vec![vec![RecordBatch::try_new(
1577-
schema.clone(),
1578-
vec![
1579-
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
1580-
Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
1581-
Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
1582-
],
1583-
)?]];
1584-
1585-
let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
1586-
.project(vec![col("b")])?
1587-
.build()?;
1588-
assert_fields_eq(&plan, vec!["b"]);
1589-
1590-
let ctx = ExecutionContext::new();
1591-
let optimized_plan = ctx.optimize(&plan)?;
1592-
match &optimized_plan {
1593-
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
1594-
LogicalPlan::TableScan(TableScan {
1595-
source,
1596-
projected_schema,
1597-
..
1598-
}) => {
1599-
assert_eq!(source.schema().fields().len(), 3);
1600-
assert_eq!(projected_schema.fields().len(), 1);
1601-
}
1602-
_ => panic!("input to projection should be InMemoryScan"),
1603-
},
1604-
_ => panic!("expect optimized_plan to be projection"),
1605-
}
1606-
1607-
let expected = format!(
1608-
"Projection: #{}.b\
1609-
\n TableScan: {} projection=Some([1])",
1610-
UNNAMED_TABLE, UNNAMED_TABLE
1611-
);
1612-
assert_eq!(format!("{:?}", optimized_plan), expected);
1613-
1614-
let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
1615-
1616-
assert_eq!(1, physical_plan.schema().fields().len());
1617-
assert_eq!("b", physical_plan.schema().field(0).name().as_str());
1618-
1619-
let runtime = ctx.state.lock().runtime_env.clone();
1620-
let batches = collect(physical_plan, runtime).await?;
1621-
assert_eq!(1, batches.len());
1622-
assert_eq!(1, batches[0].num_columns());
1623-
assert_eq!(4, batches[0].num_rows());
1624-
1625-
Ok(())
1626-
}
1627-
16281391
#[tokio::test]
16291392
async fn sort() -> Result<()> {
16301393
let results =

datafusion/tests/sql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub mod window;
9898

9999
mod explain;
100100
pub mod information_schema;
101+
mod partitioned_csv;
101102
#[cfg_attr(not(feature = "unicode_expressions"), ignore)]
102103
pub mod unicode;
103104

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Utility functions for running with a partitioned csv dataset:
19+
20+
use std::{io::Write, sync::Arc};
21+
22+
use arrow::{
23+
datatypes::{DataType, Field, Schema, SchemaRef},
24+
record_batch::RecordBatch,
25+
};
26+
use datafusion::{
27+
error::Result,
28+
prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext},
29+
};
30+
use tempfile::TempDir;
31+
32+
/// Execute SQL and return results
33+
async fn plan_and_collect(
34+
ctx: &mut ExecutionContext,
35+
sql: &str,
36+
) -> Result<Vec<RecordBatch>> {
37+
ctx.sql(sql).await?.collect().await
38+
}
39+
40+
/// Execute SQL and return results
41+
pub async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
42+
let tmp_dir = TempDir::new()?;
43+
let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
44+
plan_and_collect(&mut ctx, sql).await
45+
}
46+
47+
/// Generate CSV partitions within the supplied directory
48+
fn populate_csv_partitions(
49+
tmp_dir: &TempDir,
50+
partition_count: usize,
51+
file_extension: &str,
52+
) -> Result<SchemaRef> {
53+
// define schema for data source (csv file)
54+
let schema = Arc::new(Schema::new(vec![
55+
Field::new("c1", DataType::UInt32, false),
56+
Field::new("c2", DataType::UInt64, false),
57+
Field::new("c3", DataType::Boolean, false),
58+
]));
59+
60+
// generate a partitioned file
61+
for partition in 0..partition_count {
62+
let filename = format!("partition-{}.{}", partition, file_extension);
63+
let file_path = tmp_dir.path().join(&filename);
64+
let mut file = std::fs::File::create(file_path)?;
65+
66+
// generate some data
67+
for i in 0..=10 {
68+
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
69+
file.write_all(data.as_bytes())?;
70+
}
71+
}
72+
73+
Ok(schema)
74+
}
75+
76+
/// Generate a partitioned CSV file and register it with an execution context
77+
pub async fn create_ctx(
78+
tmp_dir: &TempDir,
79+
partition_count: usize,
80+
) -> Result<ExecutionContext> {
81+
let mut ctx =
82+
ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8));
83+
84+
let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;
85+
86+
// register csv file with the execution context
87+
ctx.register_csv(
88+
"test",
89+
tmp_dir.path().to_str().unwrap(),
90+
CsvReadOptions::new().schema(&schema),
91+
)
92+
.await?;
93+
94+
Ok(ctx)
95+
}

0 commit comments

Comments
 (0)