@@ -54,7 +54,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
5454static ALLOC : mimalloc:: MiMalloc = mimalloc:: MiMalloc ;
5555
5656#[ derive( Debug , StructOpt , Clone ) ]
57- struct BenchmarkOpt {
57+ struct BallistaBenchmarkOpt {
5858 /// Query number
5959 #[ structopt( short, long) ]
6060 query : usize ,
@@ -67,10 +67,6 @@ struct BenchmarkOpt {
6767 #[ structopt( short = "i" , long = "iterations" , default_value = "3" ) ]
6868 iterations : usize ,
6969
70- /// Number of threads to use for parallel execution
71- #[ structopt( short = "c" , long = "concurrency" , default_value = "2" ) ]
72- concurrency : usize ,
73-
7470 /// Batch size when reading CSV or Parquet files
7571 #[ structopt( short = "s" , long = "batch-size" , default_value = "8192" ) ]
7672 batch_size : usize ,
@@ -100,6 +96,45 @@ struct BenchmarkOpt {
10096 port : Option < u16 > ,
10197}
10298
99+ #[ derive( Debug , StructOpt , Clone ) ]
100+ struct DataFusionBenchmarkOpt {
101+ /// Query number
102+ #[ structopt( short, long) ]
103+ query : usize ,
104+
105+ /// Activate debug mode to see query results
106+ #[ structopt( short, long) ]
107+ debug : bool ,
108+
109+ /// Number of iterations of each test run
110+ #[ structopt( short = "i" , long = "iterations" , default_value = "3" ) ]
111+ iterations : usize ,
112+
113+ /// Number of threads to use for parallel execution
114+ #[ structopt( short = "c" , long = "concurrency" , default_value = "2" ) ]
115+ concurrency : usize ,
116+
117+ /// Batch size when reading CSV or Parquet files
118+ #[ structopt( short = "s" , long = "batch-size" , default_value = "8192" ) ]
119+ batch_size : usize ,
120+
121+ /// Path to data files
122+ #[ structopt( parse( from_os_str) , required = true , short = "p" , long = "path" ) ]
123+ path : PathBuf ,
124+
125+ /// File format: `csv` or `parquet`
126+ #[ structopt( short = "f" , long = "format" , default_value = "csv" ) ]
127+ file_format : String ,
128+
129+ /// Load the data into a MemTable before executing the query
130+ #[ structopt( short = "m" , long = "mem-table" ) ]
131+ mem_table : bool ,
132+
133+ /// Number of partitions to create when using MemTable as input
134+ #[ structopt( short = "n" , long = "partitions" , default_value = "8" ) ]
135+ partitions : usize ,
136+ }
137+
103138#[ derive( Debug , StructOpt ) ]
104139struct ConvertOpt {
105140 /// Path to csv files
@@ -127,10 +162,19 @@ struct ConvertOpt {
127162 batch_size : usize ,
128163}
129164
165+ #[ derive( Debug , StructOpt ) ]
166+ #[ structopt( about = "benchmark command" ) ]
167+ enum BenchmarkSubCommandOpt {
168+ #[ structopt( name = "ballista" ) ]
169+ BallistaBenchmark ( BallistaBenchmarkOpt ) ,
170+ #[ structopt( name = "datafusion" ) ]
171+ DataFusionBenchmark ( DataFusionBenchmarkOpt ) ,
172+ }
173+
130174#[ derive( Debug , StructOpt ) ]
131175#[ structopt( name = "TPC-H" , about = "TPC-H Benchmarks." ) ]
132176enum TpchOpt {
133- Benchmark ( BenchmarkOpt ) ,
177+ Benchmark ( BenchmarkSubCommandOpt ) ,
134178 Convert ( ConvertOpt ) ,
135179}
136180
@@ -140,20 +184,21 @@ const TABLES: &[&str] = &[
140184
141185#[ tokio:: main]
142186async fn main ( ) -> Result < ( ) > {
187+ use BenchmarkSubCommandOpt :: * ;
188+
143189 env_logger:: init ( ) ;
144190 match TpchOpt :: from_args ( ) {
145- TpchOpt :: Benchmark ( opt) => {
146- if opt. host . is_some ( ) && opt. port . is_some ( ) {
147- benchmark_ballista ( opt) . await . map ( |_| ( ) )
148- } else {
149- benchmark_datafusion ( opt) . await . map ( |_| ( ) )
150- }
191+ TpchOpt :: Benchmark ( BallistaBenchmark ( opt) ) => {
192+ benchmark_ballista ( opt) . await . map ( |_| ( ) )
193+ }
194+ TpchOpt :: Benchmark ( DataFusionBenchmark ( opt) ) => {
195+ benchmark_datafusion ( opt) . await . map ( |_| ( ) )
151196 }
152197 TpchOpt :: Convert ( opt) => convert_tbl ( opt) . await ,
153198 }
154199}
155200
156- async fn benchmark_datafusion ( opt : BenchmarkOpt ) -> Result < Vec < RecordBatch > > {
201+ async fn benchmark_datafusion ( opt : DataFusionBenchmarkOpt ) -> Result < Vec < RecordBatch > > {
157202 println ! ( "Running benchmarks with the following options: {:?}" , opt) ;
158203 let config = ExecutionConfig :: new ( )
159204 . with_concurrency ( opt. concurrency )
@@ -204,7 +249,7 @@ async fn benchmark_datafusion(opt: BenchmarkOpt) -> Result<Vec<RecordBatch>> {
204249 Ok ( result)
205250}
206251
207- async fn benchmark_ballista ( opt : BenchmarkOpt ) -> Result < ( ) > {
252+ async fn benchmark_ballista ( opt : BallistaBenchmarkOpt ) -> Result < ( ) > {
208253 println ! ( "Running benchmarks with the following options: {:?}" , opt) ;
209254
210255 let mut settings = HashMap :: new ( ) ;
@@ -956,7 +1001,7 @@ mod tests {
9561001 let expected = df. collect ( ) . await ?;
9571002
9581003 // run the query to compute actual results of the query
959- let opt = BenchmarkOpt {
1004+ let opt = DataFusionBenchmarkOpt {
9601005 query : n,
9611006 debug : false ,
9621007 iterations : 1 ,
@@ -966,8 +1011,6 @@ mod tests {
9661011 file_format : "tbl" . to_string ( ) ,
9671012 mem_table : false ,
9681013 partitions : 16 ,
969- host : None ,
970- port : None ,
9711014 } ;
9721015 let actual = benchmark_datafusion ( opt) . await ?;
9731016
0 commit comments