Skip to content

Commit 00aeea9

Browse files
authored
Improve metrics (#17)
* add first prepare phase tests * make sure travis script stays in root dir * add query after tests * add query after tests * add some mock listdir tests * add after load test, improve after query tests * fix after query test * fix and improve logging of table count check * introduced TestCommon class for sharing test code * add check directory does not exist * fix typo * add/remove spaces, lines etc as suggested by PyCharm * save load metrics * unify standard output from load and query * Fix test for get_json_files * Fix test for load results json file in metrics * Fix test for load results json file in metrics
1 parent ce34a7d commit 00aeea9

File tree

3 files changed

+119
-74
lines changed

3 files changed

+119
-74
lines changed

benchmark.py

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import math
99
import subprocess
1010
import re
11-
import shutil
12-
from tempfile import mkstemp
1311
from datetime import datetime
1412
import glob
1513
import getpass
@@ -30,7 +28,7 @@
3028
QUERY_METRIC = "query_stream_%s_query_%s"
3129
REFRESH_METRIC = "refresh_stream_%s_func_%s"
3230
THROUGHPUT_TOTAL_METRIC = "throughput_test_total"
33-
QUERY_ORDER = [ # As given in appendix A of the TPCH-specification
31+
QUERY_ORDER = [ # As given in appendix A of the TPCH-specification
3432
[14, 2, 9, 20, 6, 17, 18, 8, 21, 13, 3, 22, 16, 4, 11, 15, 1, 10, 19, 5, 7, 12],
3533
[21, 3, 18, 5, 11, 7, 6, 20, 17, 12, 16, 15, 13, 10, 2, 8, 14, 19, 9, 22, 1, 4],
3634
[6, 17, 14, 16, 19, 10, 9, 2, 15, 8, 5, 22, 12, 7, 13, 18, 1, 4, 20, 3, 11, 21],
@@ -74,7 +72,6 @@
7472
[13, 15, 17, 1, 22, 11, 3, 4, 7, 20, 14, 21, 9, 8, 2, 18, 16, 6, 10, 12, 5, 19]
7573
]
7674
NUM_QUERIES = len(QUERY_ORDER[0]) # 22
77-
7875
## End Constants
7976

8077

@@ -104,25 +101,31 @@ def stopTimer(self):
104101
def setMetric(self, name, value):
105102
self.__metrics__[name] = value
106103

104+
def printPadded(self, txt, width, fill='='):
105+
space = ' '
106+
w = int((width - len(txt) - 2 * len(space)) / 2)
107+
x = len(txt) % 2 # extra fill char if needed
108+
print(fill * w + space + txt + space + fill * x + fill * w)
109+
107110
def printResultHeader(self, title):
108111
title = self.__title__ if not title else title
109-
print("========================================")
110-
l = int((40 - len(title))/2)
111-
print(("="*l) + title + ("="*(l+1 if l%2 else l)))
112-
print("========================================")
112+
width = 60
113+
print("="*width)
114+
self.printPadded(title, width)
115+
print("="*width)
113116

114117
def printResultFooter(self):
115118
self.printResultHeader("End Results")
116119

117-
def printMetrics(self, title = None):
120+
def printMetrics(self, title=None):
118121
self.printResultHeader(title)
119122
for key, value in self.__metrics__.items():
120-
print("Time taken for %s: %s" % (key, value))
123+
print("%s: %s" % (key, value))
121124
self.printResultFooter()
122125

123126
def saveMetrics(self, run_timestamp, folder):
124127
path = os.path.join(RESULTS_DIR, run_timestamp, folder)
125-
os.makedirs(path, exist_ok = True)
128+
os.makedirs(path, exist_ok=True)
126129
metrics = dict()
127130
for key, value in self.__metrics__.items():
128131
metrics[key] = str(value)
@@ -131,10 +134,10 @@ def saveMetrics(self, run_timestamp, folder):
131134

132135

133136
class Password(argparse.Action):
134-
def __call__(self, parser, namespace, values, option_string):
135-
if values is None:
136-
values = getpass.getpass()
137-
setattr(namespace, self.dest, values)
137+
def __call__(self, parser, namespace, values, option_string):
138+
if values is None:
139+
values = getpass.getpass()
140+
setattr(namespace, self.dest, values)
138141

139142

140143
class PGDB:
@@ -143,7 +146,8 @@ class PGDB:
143146

144147
def __init__(self, host, port, db_name, user, password):
145148
# Exception handling is done by the method using this.
146-
self.__connection__ = psycopg2.connect("host='%s' port='%s' dbname='%s' user='%s' password='%s'" % (host, port, db_name, user, password))
149+
self.__connection__ = psycopg2.connect("host='%s' port='%s' dbname='%s' user='%s' password='%s'" %
150+
(host, port, db_name, user, password))
147151
self.__cursor__ = self.__connection__.cursor()
148152

149153
def close(self):
@@ -173,7 +177,7 @@ def executeQuery(self, query):
173177
def copyFrom(self, filepath, separator, table):
174178
if self.__cursor__ is not None:
175179
with open(filepath, 'r') as in_file:
176-
self.__cursor__.copy_from(in_file, table = table, sep = separator)
180+
self.__cursor__.copy_from(in_file, table=table, sep=separator)
177181
return 0
178182
else:
179183
print("database has been closed")
@@ -206,9 +210,10 @@ def build_dbgen(dbgen_dir):
206210
p.communicate()
207211
return p.returncode
208212

213+
209214
def inner_generate_data(data_dir, dbgen_dir, file_pattern, out_ext):
210215
try:
211-
os.makedirs(data_dir, exist_ok = True)
216+
os.makedirs(data_dir, exist_ok=True)
212217
for in_fname in glob.glob(os.path.join(dbgen_dir, file_pattern)):
213218
fname = os.path.basename(in_fname)
214219
out_fname = os.path.join(data_dir, fname + out_ext)
@@ -227,6 +232,7 @@ def inner_generate_data(data_dir, dbgen_dir, file_pattern, out_ext):
227232
## All files written successfully. Return success code.
228233
return 0
229234

235+
230236
def generate_data(dbgen_dir, data_dir, scale, num_streams):
231237
"""Generates data for the loading into tables.
232238
@@ -241,7 +247,7 @@ def generate_data(dbgen_dir, data_dir, scale, num_streams):
241247
"""
242248
p = subprocess.Popen([os.path.join(".", "dbgen"), "-vf", "-s", str(scale)], cwd = dbgen_dir)
243249
p.communicate()
244-
if (not p.returncode):
250+
if not p.returncode:
245251
load_dir = os.path.join(data_dir, LOAD_DIR)
246252
if inner_generate_data(load_dir, dbgen_dir, "*.tbl", ".csv"):
247253
print("unable to generate data for load phase")
@@ -255,7 +261,7 @@ def generate_data(dbgen_dir, data_dir, scale, num_streams):
255261
p = subprocess.Popen([os.path.join(".", "dbgen"), "-vf", "-s", str(scale), "-U", str(num_streams + 1)],
256262
cwd = dbgen_dir)
257263
p.communicate()
258-
if (not p.returncode):
264+
if not p.returncode:
259265
update_dir = os.path.join(data_dir, UPDATE_DIR)
260266
delete_dir = os.path.join(data_dir, DELETE_DIR)
261267
if inner_generate_data(update_dir, dbgen_dir, "*.tbl.u*", ".csv"):
@@ -288,12 +294,12 @@ def generate_queries(dbgen_dir, query_root):
288294
query_env = os.environ.copy()
289295
query_env['DSS_QUERY'] = dss_query
290296
query_gen_dir = os.path.join(query_root, GENERATED_QUERY_DIR)
291-
os.makedirs(query_gen_dir, exist_ok = True)
297+
os.makedirs(query_gen_dir, exist_ok=True)
292298
for i in range(1, 23):
293299
try:
294300
with open(os.path.join(query_gen_dir, str(i) + ".sql"), "w") as out_file:
295-
p = subprocess.Popen([os.path.join(".", "qgen"), str(i)], cwd = dbgen_dir,
296-
env = query_env, stdout = out_file)
301+
p = subprocess.Popen([os.path.join(".", "qgen"), str(i)],
302+
cwd=dbgen_dir, env=query_env, stdout=out_file)
297303
p.communicate()
298304
if p.returncode:
299305
print("Process returned non zero when generating query number %s" % i)
@@ -303,6 +309,7 @@ def generate_queries(dbgen_dir, query_root):
303309
return 1
304310
return p.returncode
305311

312+
306313
def clean_database(query_root, host, port, db_name, user, password):
307314
"""Drops the tables if they exist
308315
@@ -353,7 +360,7 @@ def create_schema(query_root, host, port, db_name, user, password):
353360
try:
354361
conn.executeQueryFromFile(os.path.join(query_root, PREP_QUERY_DIR, "create_tbl.sql"))
355362
except Exception as e:
356-
print("unable to run create tables. %s" %e)
363+
print("unable to run create tables. %s" % e)
357364
return 1
358365
conn.commit()
359366
conn.close()
@@ -382,7 +389,7 @@ def load_tables(query_root, data_dir, host, port, db_name, user, password):
382389
try:
383390
for table in TABLES:
384391
filepath = os.path.join(data_dir, LOAD_DIR, table.lower() + ".tbl.csv")
385-
conn.copyFrom(filepath, separator = "|", table = table)
392+
conn.copyFrom(filepath, separator="|", table=table)
386393
conn.commit()
387394
except Exception as e:
388395
print("unable to run load tables. %s" %e)
@@ -393,6 +400,7 @@ def load_tables(query_root, data_dir, host, port, db_name, user, password):
393400
print("unable to connect to the database. %s" % e)
394401
return 1
395402

403+
396404
def index_tables(query_root, data_dir, host, port, db_name, user, password):
397405
"""Creates indexes and foreign keys for loaded tables.
398406
@@ -414,7 +422,7 @@ def index_tables(query_root, data_dir, host, port, db_name, user, password):
414422
conn.executeQueryFromFile(os.path.join(query_root, PREP_QUERY_DIR, "create_idx.sql"))
415423
conn.commit()
416424
except Exception as e:
417-
print("unable to run index tables. %s" %e)
425+
print("unable to run index tables. %s" % e)
418426
return 1
419427
conn.close()
420428
return 0
@@ -429,7 +437,8 @@ def grouper(iterable, n, fillvalue=None):
429437

430438

431439
def insert_lineitem(cols, conn):
432-
li_insert_stmt = "INSERT INTO LINEITEM VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')" % cols
440+
li_insert_stmt = """INSERT INTO LINEITEM VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s',
441+
'%s', '%s', '%s', '%s', '%s', '%s', '%s')""" % cols
433442
conn.executeQuery(li_insert_stmt)
434443

435444

@@ -472,7 +481,7 @@ def refresh_func1(conn, data_dir, stream, num_streams, verbose):
472481
conn.commit()
473482
return 0
474483
except Exception as e:
475-
print("refresh function 1 failed. %s" %e)
484+
print("refresh function #1 failed. %s" % e)
476485
return 1
477486

478487

@@ -489,7 +498,7 @@ def refresh_func2(conn, data_dir, stream, num_streams, verbose):
489498
conn.commit()
490499
return 0
491500
except Exception as e:
492-
print("refresh function 1 failed. %s" %e)
501+
print("refresh function #2 failed. %s" % e)
493502
return 1
494503

495504

@@ -610,12 +619,6 @@ def run_throughput_test(query_root, data_dir, host, port, db_name, user, passwor
610619
return 1
611620

612621

613-
def niceprint(txt, width):
614-
w = round((width - len(txt) - 2) / 2)
615-
x = len(txt) % 2 # extra space if needed
616-
print("*"*w + " " + txt + " " + " "*x + "*"*w)
617-
618-
619622
def scale_to_num_streams(scale):
620623
num_streams = 2
621624
if scale <= 1:
@@ -651,7 +654,9 @@ def get_json_files(path):
651654
json_files = []
652655
for run_timestamp in os.listdir(os.path.join(path)):
653656
for mode in [POWER, THROUGHPUT]:
654-
json_files += get_json_files_from(os.path.join(path, run_timestamp, mode))
657+
sub_dir = os.path.join(path, run_timestamp, mode)
658+
if os.path.exists(sub_dir) and os.path.isdir(sub_dir):
659+
json_files += get_json_files_from(sub_dir)
655660
return json_files
656661

657662

@@ -682,6 +687,7 @@ def get_average(results, metric_name):
682687
avg = sum(seconds) / len(values)
683688
return avg
684689

690+
685691
def qi(results, i, s): # execution time for query Qi within the query stream s
686692
# i is the ordering number of the query ranging from 1 to 22
687693
# s is 0 for the power function and the position of the query stream for the throughput test
@@ -694,34 +700,35 @@ def qi(results, i, s): # execution time for query Qi within the query stream s
694700

695701
def ri(results, j, s): # execution time for the refresh function RFi within a refresh stream s
696702
# j is the ordering function of the refresh function ranging from 1 to 2
697-
# s is 0 for the power function and the position of the pair of refresh functions in the stream for the throughput test
703+
# s is 0 for the power function and the position of the pair of refresh functions
704+
# in the stream for the throughput test
698705
assert(j == 1 or j == 2)
699706
assert(0 <= s)
700707
metric_name = REFRESH_METRIC % (s, j)
701708
ret = get_average(results, metric_name)
702709
return ret
703710

704711

705-
def ts(results): # total time needed to execute the throughput test
712+
def ts(results): # total time needed to execute the throughput test
706713
metric_name = THROUGHPUT_TOTAL_METRIC
707714
ret = get_average(results, metric_name)
708715
return ret
709716

710717

711-
def get_power_size(results, scale, num_streams):
718+
def get_power_size(results, scale_factor):
712719
qi_product = 1
713720
for i in range(1, NUM_QUERIES + 1):
714721
qi_product *= qi(results, i, 0)
715722
ri_product = 1
716-
for j in [1, 2]: # two refresh functions
723+
for j in [1, 2]: # two refresh functions
717724
ri_product *= ri(results, j, 0)
718725
denominator = math.pow(qi_product * ri_product, 1/24)
719-
power_size = (3600 / denominator) * scale
726+
power_size = (3600 / denominator) * scale_factor
720727
return power_size
721728

722729

723-
def get_throughput_size(results, scale, num_streams):
724-
throughput_size = ( ( num_streams * NUM_QUERIES ) / ts(results) ) * 3600 * scale
730+
def get_throughput_size(results, scale_factor, num_streams):
731+
throughput_size = ((num_streams * NUM_QUERIES) / ts(results)) * 3600 * scale_factor
725732
return throughput_size
726733

727734

@@ -730,20 +737,32 @@ def get_qphh_size(power_size, throughput_size):
730737
return qphh_size
731738

732739

733-
def metrics(scale, num_streams):
740+
def calc_metrics(run_timestamp, scale_factor, num_streams):
734741
results = load_results()
735-
power_size = get_power_size(results, scale, num_streams)
742+
r = Result("Metric")
743+
#
744+
power_size = get_power_size(results, scale_factor)
745+
r.setMetric("power_size", power_size)
736746
print("Power@Size = %s" % power_size)
737-
throughput_size = get_throughput_size(results, scale, num_streams)
747+
#
748+
throughput_size = get_throughput_size(results, scale_factor, num_streams)
749+
r.setMetric("throughput_size", throughput_size)
738750
print("Throughput@Size = %s" % throughput_size)
751+
#
739752
qphh_size = get_qphh_size(power_size, throughput_size)
753+
r.setMetric("qphh_size", qphh_size)
740754
print("QphH@Size = %s" % qphh_size)
755+
#
756+
r.printMetrics("Metrics")
757+
r.saveMetrics(run_timestamp, "metrics")
758+
741759

742760

743761
def main(phase, host, port, user, password, database, data_dir, query_root, dbgen_dir,
744762
scale, num_streams, verbose, read_only):
745763
if num_streams == 0:
746764
num_streams = scale_to_num_streams(scale)
765+
run_timestamp = "run_%s" % time.strftime("%Y%m%d_%H%M%S", time.gmtime())
747766
if phase == "prepare":
748767
## try to build dbgen from source and quit if failed
749768
if build_dbgen(dbgen_dir):
@@ -760,7 +779,7 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
760779
exit(1)
761780
print("created query files in %s" % query_root)
762781
elif phase == "load":
763-
result = Result("Load Results")
782+
result = Result("Load")
764783
if clean_database(query_root, host, port, database, user, password):
765784
print("could not clean the database.")
766785
exit(1)
@@ -784,8 +803,8 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
784803
result.setMetric("index_tables", result.stopTimer())
785804
print("done creating indexes and foreign keys")
786805
result.printMetrics()
806+
result.saveMetrics(run_timestamp, "load")
787807
elif phase == "query":
788-
run_timestamp = "run_%s" % time.strftime("%Y%m%d_%H%M%S", time.gmtime())
789808
if run_power_test(query_root, data_dir, host, port, database, user, password,
790809
run_timestamp, num_streams, verbose, read_only):
791810
print("running power test failed")
@@ -796,11 +815,12 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
796815
print("running throughput test failed")
797816
exit(1)
798817
print("done performance test")
799-
metrics(scale, num_streams)
818+
calc_metrics(run_timestamp, scale, num_streams)
800819

801820

802821
if __name__ == "__main__":
803-
parser = argparse.ArgumentParser(description = "PGTPCH")
822+
parser = argparse.ArgumentParser(description="PGTPCH")
823+
804824
default_host = "localhost"
805825
default_port = 5432
806826
default_username = "postgres"
@@ -833,12 +853,13 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
833853
parser.add_argument("-s", "--scale", type=float, default=default_scale,
834854
help="Size of the data generated, scale factor; default is %s = 1GB" % default_scale)
835855
parser.add_argument("-n", "--num-streams", type=int, default=default_num_streams,
836-
help="Number of streams to run the throughput test with; default is %s" % default_num_streams +
856+
help="Number of streams to run the throughput test with; default is %s" % default_num_streams +
837857
", i.e. based on scale factor SF")
838858
parser.add_argument("-b", "--verbose", action="store_true",
839859
help="Print more information to standard output")
840860
parser.add_argument("-r", "--read-only", action="store_true",
841-
help="Do not execute refresh functions during the query phase, which allows for running it repeatedly")
861+
help="Do not execute refresh functions during the query phase, " +
862+
"which allows for running it repeatedly")
842863
args = parser.parse_args()
843864

844865
## Extract all arguments into variables
@@ -858,4 +879,3 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
858879

859880
## main
860881
main(phase, host, port, user, password, database, data_dir, query_root, dbgen_dir, scale, num_streams, verbose, read_only)
861-

0 commit comments

Comments
 (0)