1+ import argparse
2+ import subprocess
3+ import pathlib
4+ import os
5+ from sys import stderr
6+
7+
8+ def variant (string ):
9+ if string not in ["h" , "ds" ]:
10+ raise ValueError ("variant must be h or ds" )
11+ return string
12+
13+ def paths (string ):
14+ return list (map (pathlib .Path , string .split (";" )))
15+
16+ def parse_args (passed = None ):
17+
18+ parser = argparse .ArgumentParser ()
19+
20+ parser .add_argument ('--datasize' , type = int , default = 1 )
21+ parser .add_argument ('--variant' , type = variant , default = 'h' )
22+ parser .add_argument ('--tasks' , type = int , default = 1 )
23+
24+ parser .add_argument ('--dqrun' , type = pathlib .Path )
25+ parser .add_argument ('--gen-queries' , type = pathlib .Path )
26+ parser .add_argument ('--downloaders-dir' , type = pathlib .Path )
27+ parser .add_argument ('--udfs-dir' , type = paths )
28+ parser .add_argument ('--fs-cfg' , type = pathlib .Path )
29+ parser .add_argument ('--flame-graph' , type = pathlib .Path )
30+ parser .add_argument ('--result-compare' , type = pathlib .Path )
31+ parser .add_argument ('--gateways-cfg' , type = pathlib .Path )
32+ parser .add_argument ('--runner-path' , type = pathlib .Path )
33+
34+ parser .add_argument ('-o' , '--output' , default = "./results" )
35+ parser .add_argument ('--clean-old' , action = "store_true" , default = False )
36+ parser .add_argument ('--query-filter' , action = "append" , default = [])
37+
38+ return parser .parse_args (passed )
39+
40+ class Runner :
41+ def prepare_queries_dir (self , custom_pragmas ):
42+ print ("Preparing queries..." , file = stderr )
43+ self .queries_dir .mkdir (parents = True , exist_ok = True )
44+ print ("queries dir: " , self .queries_dir .resolve (), file = stderr )
45+ cmd = [str (self .args .gen_queries )]
46+ cmd += ["--output" , f"{ self .queries_dir } " ]
47+ cmd += ["--variant" , f"{ self .args .variant } " ]
48+ cmd += ["--syntax" , "yql" ]
49+ cmd += ["--dataset-size" , f"{ self .args .datasize } " ]
50+ for it in custom_pragmas :
51+ cmd += ["--pragma" , it ]
52+ print (cmd , file = stderr )
53+ subprocess .run (cmd )
54+
55+ def prepare_tpc_dir (self ):
56+ print ("Preparing tpc..." , file = stderr )
57+ cmd = [f"{ self .args .downloaders_dir } /download_files_{ self .args .variant } _{ self .args .datasize } .sh" ]
58+ print (cmd , file = stderr )
59+ subprocess .run (cmd )
60+
61+ def __init__ (self , args , enable_spilling ):
62+ self .args = args
63+ self .enable_spilling = enable_spilling
64+
65+ self .queries_dir = pathlib .Path (f"queries{ "+" if self .enable_spilling else "-" } spilling-{ args .datasize } -{ args .tasks } " ).resolve ()
66+ if self .args .clean_old or not self .queries_dir .exists ():
67+ self .prepare_queries_dir ([
68+ f"dq.MaxTasksPerStage={ self .args .tasks } " ,
69+ "dq.OptLLVM=ON"
70+ ] + [
71+ "dq.UseFinalizeByKey=true" ,
72+ "dq.EnableSpillingNodes=All" ,
73+ ] if self .enable_spilling else [])
74+
75+ self .tpc_dir = pathlib .Path (f"{ self .args .downloaders_dir } /tpc/{ self .args .variant } /{ self .args .datasize } " ).resolve ()
76+ if self .args .clean_old or not self .tpc_dir .exists ():
77+ self .prepare_tpc_dir ()
78+ if not pathlib .Path ("./tpc" ).exists ():
79+ os .symlink (f"{ self .args .downloaders_dir } /tpc" , f"{ pathlib .Path ("./tpc" )} " , target_is_directory = True )
80+
81+ self .result_dir = pathlib .Path (f"{ self .args .output } /{ "with" if self .enable_spilling else "no" } -spilling/{ args .variant } -{ args .datasize } -{ args .tasks } " ).resolve ()
82+ self .result_dir .mkdir (parents = True , exist_ok = True )
83+
84+ def run (self ):
85+ cmd = ["/usr/bin/time" , f"{ str (self .args .runner_path )} " ]
86+ cmd += ["--perf" ]
87+ for it in self .args .query_filter :
88+ cmd += ["--include-q" , it ]
89+ cmd += ["--query-dir" , f"{ str (self .queries_dir )} /{ self .args .variant } " ]
90+ cmd += ["--bindings" , f"{ str (self .queries_dir )} /{ self .args .variant } /bindings.json" ]
91+ cmd += ["--result-dir" , str (self .result_dir )]
92+ cmd += ["--flame-graph" , str (self .args .flame_graph )]
93+ cmd += [f"{ self .args .dqrun } " , "-s" ]
94+ cmd += ["--enable-spilling" ] if self .enable_spilling else []
95+ cmd += ["--udfs-dir" , ";" .join (map (str , self .args .udfs_dir ))]
96+ cmd += ["--fs-cfg" , f"{ str (self .args .fs_cfg )} " ]
97+ cmd += ["--gateways-cfg" , f"{ str (self .args .gateways_cfg )} " ]
98+ subprocess .run (cmd )
99+
100+ return self .result_dir
101+
102+ def result_compare (args , to_compare ):
103+ cmd = [f"{ args .result_compare } " ]
104+ cmd += ["-v" ]
105+ cmd += to_compare
106+ print (cmd , file = stderr )
107+ with open (f"{ args .output } /result-{ args .variant } -{ args .datasize } -{ args .tasks } .htm" , "w" ) as result_table :
108+ subprocess .run (cmd , stdout = result_table )
109+
110+ def run (passed = None ):
111+ args = parse_args (passed )
112+
113+ print (args .query_filter )
114+
115+ results = []
116+ print ("With spilling..." , file = stderr )
117+ results .append (Runner (args , True ).run ())
118+ print ("No spilling..." , file = stderr )
119+ results .append (Runner (args , False ).run ())
120+
121+ print (results , file = stderr )
122+
123+ result_compare (args , results )
124+
125+ def main ():
126+ run ()
127+
128+ if __name__ == "__main__" :
129+ main ()
0 commit comments