Skip to content

Commit 1745d2b

Browse files
authored
Getting my fork in sync with upstream (#10)
* updated documentation and help * Update README.md * fix typos in args help * add travis ci * fix ci build to return exist code 0 * add prepare step to ci build * create user and db * create user and db in before_script * fix psql command arg * use postgres version 9.6 for ci build * fix warning for psycopg2 * set password in travis settings * add query phase to ci build * Use DSP travis ci build status * change data size from 1GB to 10MB * move env var with password from travis settings to yml * move psycopg2-binary from ci yml to requirements * minor changes in README.md * minor changes in README.md * Remove psycopg2 from requirements remove sources, we have psycopg2-binary in there now * added throughput tests * avoid plus one * add reboot functionality * refactor code, run twice and add verbose and read-only options * set number of stream based on scale factor per default * update README.md * skeleton for metrics * reuse code * load json from result files * remove folder tree for results before running perf tests * implemented metrics formulas * added TODOs * fix bug * remove unnecessary print statements * python 3.6 needed for dict syntax * sudo is needed for restarting postgresql and clearing os caches * fix bug in print result header/footer * do not skip running perf tests * remove results folder in the beginning * add throughput total time * do not version control results folder * change from two runs to only one * move Result outside of the loop * merge metric and no reboot * fix merge problem
1 parent 73f35f2 commit 1745d2b

File tree

1 file changed

+56
-81
lines changed

1 file changed

+56
-81
lines changed

benchmark.py

Lines changed: 56 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
[13, 15, 17, 1, 22, 11, 3, 4, 7, 20, 14, 21, 9, 8, 2, 18, 16, 6, 10, 12, 5, 19]
7070
]
7171
NUM_QUERIES = len(QUERY_ORDER[0]) # 22
72-
NUM_RUNS = 2 # as per TPC-H spec, the test should run twice, with a reboot between them
7372

7473
## End Constants
7574

@@ -248,8 +247,7 @@ def generate_data(dbgen_dir, data_dir, scale, num_streams):
248247

249248
## Update/Delete phase data
250249
## we generate num_streams + 1 number of updates because 1 is used by the power test
251-
## and multiplied by 2 because there are two runs
252-
p = subprocess.Popen(["./dbgen", "-vf", "-s", str(scale), "-U", str(2 * (num_streams + 1))],
250+
p = subprocess.Popen(["./dbgen", "-vf", "-s", str(scale), "-U", str(num_streams + 1)],
253251
cwd = dbgen_dir)
254252
p.communicate()
255253
if (not p.returncode):
@@ -430,12 +428,11 @@ def insert_lineitem(cols, conn):
430428
conn.executeQuery(li_insert_stmt)
431429

432430

433-
def refresh_func1(conn, data_dir, run, stream, num_streams, verbose):
431+
def refresh_func1(conn, data_dir, stream, num_streams, verbose):
434432
try:
435433
if verbose:
436-
print("Running refresh function #1 in run #%s stream #%s" % (run, stream))
434+
print("Running refresh function #1 in stream #%s" % stream)
437435
file_nr = stream + 1 # generated files are named 1,2,3,... while streams are indexed 0,1,2,...
438-
file_nr += run * (num_streams+1) # and we have two runs
439436
filepath_o = os.path.join(data_dir, UPDATE_DIR, "orders.tbl.u" + str(file_nr) + ".csv")
440437
filepath_l = os.path.join(data_dir, UPDATE_DIR, "lineitem.tbl.u" + str(file_nr) + ".csv")
441438
with open(filepath_o) as orders_file, open(filepath_l) as lineitem_file:
@@ -474,12 +471,11 @@ def refresh_func1(conn, data_dir, run, stream, num_streams, verbose):
474471
return 1
475472

476473

477-
def refresh_func2(conn, data_dir, run, stream, num_streams, verbose):
474+
def refresh_func2(conn, data_dir, stream, num_streams, verbose):
478475
try:
479476
if verbose:
480-
print("Running refresh function #2 in run #%s stream #%s" % (run, stream))
477+
print("Running refresh function #2 in stream #%s" % stream)
481478
file_nr = stream + 1
482-
file_nr += run * (num_streams+1)
483479
filepath = os.path.join(data_dir, DELETE_DIR, "delete." + str(file_nr) + ".csv")
484480
with open(filepath, 'r') as in_file:
485481
for ids in grouper(in_file, 100, ''):
@@ -492,73 +488,73 @@ def refresh_func2(conn, data_dir, run, stream, num_streams, verbose):
492488
return 1
493489

494490

495-
def run_query_stream(conn, query_root, run, stream, num_streams, result, verbose):
496-
index = (run * (num_streams+1) + stream) % len(QUERY_ORDER)
491+
def run_query_stream(conn, query_root, stream, num_streams, result, verbose):
492+
index = stream % len(QUERY_ORDER)
497493
order = QUERY_ORDER[index]
498494
for i in range(0, 22):
499495
try:
500496
if verbose:
501-
print("Running query #%s in run #%s stream #%s ..." % (order[i], run, stream))
497+
print("Running query #%s in stream #%s ..." % (order[i], stream))
502498
filepath = os.path.join(query_root, GENERATED_QUERY_DIR, str(order[i]) + ".sql")
503499
result.startTimer()
504500
conn.executeQueryFromFile(filepath)
505-
result.setMetric("run_%s_stream_%s_query_%s" % (run, stream, order[i]), result.stopTimer())
501+
result.setMetric("query_stream_%s_query_%s" % (stream, order[i]), result.stopTimer())
506502
except Exception as e:
507-
print("unable to execute query %s in run %s stream %s: %s" % (order[i], run, stream, e))
503+
print("unable to execute query %s in stream %s: %s" % (order[i], stream, e))
508504
return 1
509505
return 0
510506

511507

512508
def run_power_test(query_root, data_dir, host, port, db_name, user, password,
513-
run, num_streams, verbose, read_only):
509+
num_streams, verbose, read_only):
514510
try:
515-
print("Power test run #%s started ..." % run)
511+
print("Power test started ...")
516512
conn = PGDB(host, port, db_name, user, password)
517513
result = Result("Power")
518514
result.startTimer()
519515
stream = 0 # constant for power test
520516
#
521517
if not read_only:
522-
if refresh_func1(conn, data_dir, run, stream, num_streams, verbose):
518+
if refresh_func1(conn, data_dir, stream, num_streams, verbose):
523519
return 1
524-
result.setMetric("refresh_run_%s_stream_%s_func1" % (run, stream), result.stopTimer())
520+
result.setMetric("refresh_stream_%s_func_1" % stream, result.stopTimer())
525521
#
526-
if run_query_stream(conn, query_root, run, stream, num_streams, result, verbose):
522+
if run_query_stream(conn, query_root, stream, num_streams, result, verbose):
527523
return 1
528524
#
529525
result.startTimer()
530526
if not read_only:
531-
if refresh_func2(conn, data_dir, run, stream, num_streams, verbose):
527+
if refresh_func2(conn, data_dir, stream, num_streams, verbose):
532528
return 1
533-
result.setMetric("refresh_run_%s_stream_%s_func2" % (run, stream), result.stopTimer())
529+
result.setMetric("refresh_stream_%s_func_2" % stream, result.stopTimer())
534530
#
535-
print("Power test run #%s finished." % run)
531+
print("Power test finished.")
536532
if verbose:
537533
result.printMetrics()
538-
result.saveMetrics("power%s" % run)
534+
result.saveMetrics("power")
539535
except Exception as e:
540536
print("unable to run power test. DB connection failed: %s" % e)
541537
return 1
542538

543539

544540
def run_throughput_inner(query_root, data_dir, host, port, db_name, user, password,
545-
run, stream, num_streams, q, verbose):
541+
stream, num_streams, q, verbose):
546542
try:
547543
conn = PGDB(host, port, db_name, user, password)
548544
result = Result("ThroughputQueryStream%s" % stream)
549-
if run_query_stream(conn, query_root, run, stream, num_streams, result, verbose):
550-
print("unable to finish query run #%s stream #%s" % (run, stream))
545+
if run_query_stream(conn, query_root, stream, num_streams, result, verbose):
546+
print("unable to finish query in stream #%s" % stream)
551547
exit(1)
552548
q.put(result)
553549
except Exception as e:
554-
print("unable to connect to DB for query run #%s stream #%s: %s" % (run, stream, e))
550+
print("unable to connect to DB for query in stream #%s: %s" % (stream, e))
555551
exit(1)
556552

557553

558554
def run_throughput_test(query_root, data_dir, host, port, db_name, user, password,
559-
run, num_streams, verbose, read_only):
555+
num_streams, verbose, read_only):
560556
try:
561-
print("Throughput test run #%s started ..." % run)
557+
print("Throughput test started ...")
562558
conn = PGDB(host, port, db_name, user, password)
563559
total = Result("ThroughputTotal")
564560
total.startTimer()
@@ -567,10 +563,10 @@ def run_throughput_test(query_root, data_dir, host, port, db_name, user, passwor
567563
for i in range(num_streams):
568564
stream = i + 1
569565
# queries
570-
print("Throughput test run #%s stream #%s started ..." % (run, stream))
566+
print("Throughput test in stream #%s started ..." % stream)
571567
p = Process(target=run_throughput_inner,
572568
args=(query_root, data_dir, host, port, db_name, user, password,
573-
run, stream, num_streams, q, verbose))
569+
stream, num_streams, q, verbose))
574570
processes.append(p)
575571
p.start()
576572
result = Result("ThroughputRefreshStream")
@@ -579,33 +575,33 @@ def run_throughput_test(query_root, data_dir, host, port, db_name, user, passwor
579575
# refresh functions
580576
result.startTimer()
581577
if not read_only:
582-
if refresh_func1(conn, data_dir, run, stream, num_streams,verbose):
578+
if refresh_func1(conn, data_dir, stream, num_streams, verbose):
583579
return 1
584-
result.setMetric("refresh_run_%s_stream_%s_func1" % (run, stream), result.stopTimer())
580+
result.setMetric("refresh_stream_%s_func_1" % stream, result.stopTimer())
585581
#
586582
result.startTimer()
587583
if not read_only:
588-
if refresh_func2(conn, data_dir, run, stream, num_streams, verbose):
584+
if refresh_func2(conn, data_dir, stream, num_streams, verbose):
589585
return 1
590-
result.setMetric("refresh_run_%s_stream_%s_func2" % (run, stream), result.stopTimer())
586+
result.setMetric("refresh_stream_%s_func_2" % stream, result.stopTimer())
591587
#
592588
q.put(result)
593589
for p in processes:
594590
p.join()
595-
print("Throughput test run #%s (all streams) finished." % run)
591+
print("Throughput test finished.")
596592
for i in range(q.qsize()):
597593
res = q.get(False)
598594
if verbose:
599595
res.printMetrics()
600-
res.saveMetrics("throughput%s" % run)
596+
res.saveMetrics("throughput")
601597
#
602-
total.setMetric("throughput_test_total_run_%s" % run, total.stopTimer())
598+
total.setMetric("throughput_test_total", total.stopTimer())
603599
if verbose:
604600
total.printMetrics()
605-
total.saveMetrics("throughput%s" % run)
601+
total.saveMetrics("throughput")
606602
#
607603
except Exception as e:
608-
print("unable to execute throughput tests in run #%s. e" % (run, e))
604+
print("unable to execute throughput test: %s" % e)
609605
return 1
610606

611607

@@ -615,21 +611,6 @@ def niceprint(txt, width):
615611
print("*"*w + " " + txt + " " + " "*x + "*"*w)
616612

617613

618-
def reboot():
619-
# TODO: we need another solution, this is fine for a local DB and running with sudo rights
620-
# but the DB can be remote, user running the test has no sudo, etc.
621-
width = 60
622-
print("*"*width)
623-
niceprint("Restarting PostgreSQL ...", width)
624-
command = ['sudo', 'service', 'postgresql', 'restart'];
625-
subprocess.call(command, shell=False) # shell=FALSE for sudo to work.
626-
print("*"*width)
627-
niceprint("Clearing OS caches ...", width)
628-
# https://linux-mm.org/Drop_Caches
629-
os.system('sudo sh -c "sync; echo 3 > /proc/sys/vm/drop_caches"')
630-
print("*"*width)
631-
632-
633614
def scale_to_num_streams(scale):
634615
num_streams = 2
635616
if scale <= 1:
@@ -664,8 +645,7 @@ def get_json_files_from(path):
664645
def get_json_files(path):
665646
json_files = []
666647
for mode in ['power', 'throughput']:
667-
for run in range(2):
668-
json_files += get_json_files_from(path + "/" + mode + str(run) + "/")
648+
json_files += get_json_files_from(path + "/" + mode + "/")
669649
return json_files
670650

671651

@@ -695,29 +675,26 @@ def qi(jsons, i, s): # execution time for query Qi within the query stream s
695675
# s is 0 for the power function and the position of the query stream for the throughput test
696676
assert(1 <= i <= 22)
697677
assert(0 <= s)
698-
metric_name = 'run_%s_stream_%s_query_%s'
699-
s0 = get_timedelta_in_seconds(jsons, metric_name % (0, s, i))
700-
s1 = get_timedelta_in_seconds(jsons, metric_name % (1, s, i))
701-
return ( s0 + s1 ) / 2 # simple average of two values
678+
metric_name = 'query_stream_%s_query_%s'
679+
ret = get_timedelta_in_seconds(jsons, metric_name % (s, i))
680+
return ret
702681

703682

704683
def ri(jsons, j, s): # execution time for the refresh function RFi within a refresh stream s
705684
# j is the ordering function of the refresh function ranging from 1 to 2
706685
# s is 0 for the power function and the position of the pair of refresh functions in the stream for the throughput test
707686
assert(j == 1 or j == 2)
708687
assert(0 <= s)
709-
metric_name = 'refresh_run_%s_stream_%s_func%s'
710-
s0 = get_timedelta_in_seconds(jsons, metric_name % (0, s, j))
711-
s1 = get_timedelta_in_seconds(jsons, metric_name % (1, s, j))
712-
return ( s0 + s1 ) / 2 # simple average of two values
688+
metric_name = 'refresh_stream_%s_func_%s'
689+
ret = get_timedelta_in_seconds(jsons, metric_name % (s, j))
690+
return ret
713691

714692

715693
def ts(jsons): # total time needed to execute the throughput test
716694
# TODO: total time for throughput tests needs to be implemented
717-
metric_name = 'throughput_test_total_run_%s'
718-
s0 = get_timedelta_in_seconds(jsons, metric_name % 0)
719-
s1 = get_timedelta_in_seconds(jsons, metric_name % 1)
720-
return ( s0 + s1 ) / 2
695+
metric_name = 'throughput_test_total'
696+
ret = get_timedelta_in_seconds(jsons, metric_name)
697+
return ret
721698

722699
def get_power_size(jsons, scale, num_streams):
723700
qi_product = 1
@@ -796,17 +773,16 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
796773
print("done creating indexes and foreign keys")
797774
result.printMetrics()
798775
elif phase == "query":
799-
for run in [0,1]:
800-
# Power test
801-
if run_power_test(query_root, data_dir, host, port, database, user, password,
802-
run, num_streams, verbose, read_only):
803-
print("running power test failed")
804-
exit(1)
805-
# Throughput test
806-
if run_throughput_test(query_root, data_dir, host, port, database, user, password,
807-
run, num_streams, verbose, read_only):
808-
print("running throughput test failed")
809-
exit(1)
776+
if run_power_test(query_root, data_dir, host, port, database, user, password,
777+
num_streams, verbose, read_only):
778+
print("running power test failed")
779+
exit(1)
780+
# Throughput test
781+
if run_throughput_test(query_root, data_dir, host, port, database, user, password,
782+
num_streams, verbose, read_only):
783+
print("running throughput test failed")
784+
exit(1)
785+
print("done performance test")
810786
metrics(scale, num_streams)
811787

812788

@@ -844,4 +820,3 @@ def main(phase, host, port, user, password, database, data_dir, query_root, dbge
844820

845821
## main
846822
main(phase, host, port, user, password, database, data_dir, query_root, dbgen_dir, scale, num_streams, verbose, read_only)
847-

0 commit comments

Comments
 (0)