Skip to content

Commit 5bdc880

Browse files
author
QP Hou
authored
round trip TPCH queries in tests (#630)
* honor table name for csv/parquet scan in ballista plan serde * disable query 7,8,9 in ballista integration test * add tpch query ballista roundtrip test * also roud trip physical plan * fix clippy * simplify test code
1 parent c21106e commit 5bdc880

File tree

3 files changed

+88
-1
lines changed

3 files changed

+88
-1
lines changed

benchmarks/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,6 @@ futures = "0.3"
3939
env_logger = "^0.8"
4040
mimalloc = { version = "0.1", optional = true, default-features = false }
4141
snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
42+
43+
[dev-dependencies]
44+
ballista-core = { path = "../ballista/rust/core" }

benchmarks/src/bin/tpch.rs

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,6 @@ mod tests {
573573

574574
use datafusion::arrow::array::*;
575575
use datafusion::arrow::util::display::array_value_to_string;
576-
577576
use datafusion::logical_plan::Expr;
578577
use datafusion::logical_plan::Expr::Cast;
579578

@@ -1042,4 +1041,88 @@ mod tests {
10421041

10431042
Ok(())
10441043
}
1044+
1045+
mod ballista_round_trip {
1046+
use super::*;
1047+
use ballista_core::serde::protobuf;
1048+
use datafusion::physical_plan::ExecutionPlan;
1049+
use std::convert::TryInto;
1050+
1051+
fn round_trip_query(n: usize) -> Result<()> {
1052+
let config = ExecutionConfig::new()
1053+
.with_concurrency(1)
1054+
.with_batch_size(10);
1055+
let mut ctx = ExecutionContext::with_config(config);
1056+
1057+
// set tpch_data_path to dummy value and skip physical plan serde test when TPCH_DATA
1058+
// is not set.
1059+
let tpch_data_path =
1060+
env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
1061+
1062+
for &table in TABLES {
1063+
let schema = get_schema(table);
1064+
let options = CsvReadOptions::new()
1065+
.schema(&schema)
1066+
.delimiter(b'|')
1067+
.has_header(false)
1068+
.file_extension(".tbl");
1069+
let provider = CsvFile::try_new(
1070+
&format!("{}/{}.tbl", tpch_data_path, table),
1071+
options,
1072+
)?;
1073+
ctx.register_table(table, Arc::new(provider))?;
1074+
}
1075+
1076+
// test logical plan round trip
1077+
let plan = create_logical_plan(&mut ctx, n)?;
1078+
let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
1079+
let round_trip: LogicalPlan = (&proto).try_into().unwrap();
1080+
assert_eq!(
1081+
format!("{:?}", plan),
1082+
format!("{:?}", round_trip),
1083+
"logical plan round trip failed"
1084+
);
1085+
1086+
// test optimized logical plan round trip
1087+
let plan = ctx.optimize(&plan)?;
1088+
let proto: protobuf::LogicalPlanNode = (&plan).try_into().unwrap();
1089+
let round_trip: LogicalPlan = (&proto).try_into().unwrap();
1090+
assert_eq!(
1091+
format!("{:?}", plan),
1092+
format!("{:?}", round_trip),
1093+
"opitmized logical plan round trip failed"
1094+
);
1095+
1096+
// test physical plan roundtrip
1097+
if env::var("TPCH_DATA").is_ok() {
1098+
let physical_plan = ctx.create_physical_plan(&plan)?;
1099+
let proto: protobuf::PhysicalPlanNode =
1100+
(physical_plan.clone()).try_into().unwrap();
1101+
let round_trip: Arc<dyn ExecutionPlan> = (&proto).try_into().unwrap();
1102+
assert_eq!(
1103+
format!("{:?}", physical_plan),
1104+
format!("{:?}", round_trip),
1105+
"physical plan round trip failed"
1106+
);
1107+
}
1108+
1109+
Ok(())
1110+
}
1111+
1112+
macro_rules! test_round_trip {
1113+
($tn:ident, $query:expr) => {
1114+
#[test]
1115+
fn $tn() -> Result<()> {
1116+
round_trip_query($query)
1117+
}
1118+
};
1119+
}
1120+
1121+
test_round_trip!(q1, 1);
1122+
test_round_trip!(q3, 3);
1123+
test_round_trip!(q5, 5);
1124+
test_round_trip!(q6, 6);
1125+
test_round_trip!(q10, 10);
1126+
test_round_trip!(q12, 12);
1127+
}
10451128
}

datafusion/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub use self::csv::{CsvFile, CsvReadOptions};
2828
pub use self::datasource::{TableProvider, TableType};
2929
pub use self::memory::MemTable;
3030

31+
/// Source for table input data
3132
pub(crate) enum Source<R = Box<dyn std::io::Read + Send + Sync + 'static>> {
3233
/// Path to a single file or a directory containing one of more files
3334
Path(String),

0 commit comments

Comments
 (0)