forked from yugabyte/yugabyte-db
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_tests_on_spark.py
executable file
·1466 lines (1230 loc) · 60.1 KB
/
run_tests_on_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# Copyright (c) YugaByte, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations
# under the License.
#
"""
Run YugaByte tests on Spark using PySpark.
Example (mostly useful during testing this script):
cd ~/code/yugabyte
Run all C++ tests:
"$SPARK_INSTALLATION_DIR/bin/spark-submit" \
build-support/run_tests_on_spark.py \
--spark-master-url=spark://$SPARK_HOST:$SPARK_PORT \
--build-root "$PWD/build/release-gcc-dynamic-ninja" \
--verbose \
--reports-dir /tmp \
--write_report \
--save_report_to_build_dir \
--cpp \
--recreate_archive_for_workers
Run Java tests satisfying a particular regex:
"$SPARK_INSTALLATION_DIR/bin/spark-submit" \
build-support/run_tests_on_spark.py \
--spark-master-url=spark://$SPARK_HOST:$SPARK_PORT \
--build-root "$PWD/build/release-gcc-dynamic-ninja" \
--verbose \
--reports-dir=/tmp \
--write_report \
--save_report_to_build_dir \
--java \
--test_filter_re=org[.]yb[.].*Pg.* \
--send_archive_to_workers \
--recreate_archive_for_workers
"""
import argparse
import getpass
import glob
import gzip
import json
import logging
import os
import pwd
import random
import re
import shutil
import socket
import subprocess
import sys
import threading
import time
import traceback
import errno
import signal
from datetime import datetime
from collections import defaultdict
from typing import List, Dict, Set, Tuple, Optional, Any, cast
BUILD_SUPPORT_DIR = os.path.dirname(os.path.realpath(__file__))
# An upper bound on a single test's running time. In practice there are multiple other timeouts
# that should be triggered earlier.
TEST_TIMEOUT_UPPER_BOUND_SEC = 35 * 60
# We wait for a special "flag file" to appear indicating that the test has started running, which is
# created by the about_to_start_running_test in common-test-env.sh. If this does not happen within
# this amount of time, we terminate the run-test.sh script. This should prevent tests getting stuck
# for a long time in macOS builds.
TIME_SEC_TO_START_RUNNING_TEST = 5 * 60
# Defaults for maximum test failure threshold, after which the Spark job will be aborted
DEFAULT_MAX_NUM_TEST_FAILURES_MACOS_DEBUG = 150
DEFAULT_MAX_NUM_TEST_FAILURES = 100
# Default for test artifact size limit, in bytes
MAX_ARTIFACT_SIZE_BYTES = 100*1024*1024 # 10 MB
def wait_for_path_to_exist(target_path: str) -> None:
if os.path.exists(target_path):
return
waited_for_sec = 0
start_time_sec = time.time()
printed_msg_at_sec: float = 0.0
MSG_PRINT_INTERVAL_SEC = 5.0
TIMEOUT_SEC = 120
while not os.path.exists(target_path):
current_time_sec = time.time()
if current_time_sec - printed_msg_at_sec >= MSG_PRINT_INTERVAL_SEC:
sys.stderr.write("Path '%s' does not exist, waiting\n" % target_path)
printed_msg_at_sec = current_time_sec
if current_time_sec - start_time_sec >= TIMEOUT_SEC:
raise IOError(
"Timed out after %.1f seconds waiting for path to exist: %s" % (
current_time_sec - start_time_sec, target_path
))
time.sleep(0.1)
elapsed_time = time.time() - start_time_sec
sys.stderr.write("Waited for %.1f seconds for the path '%s' to appear\n" % (
elapsed_time, target_path
))
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'python'))
from yb import yb_dist_tests # noqa
from yb import command_util # noqa
from yb.common_util import set_to_comma_sep_str, is_macos # noqa
# Special Jenkins environment variables. They are propagated to tasks running in a distributed way
# on Spark.
JENKINS_ENV_VARS = [
"BUILD_TYPE",
"BUILD_ID",
"BUILD_NUMBER",
"BUILD_TAG",
"BUILD_URL",
"CVS_BRANCH",
"EXECUTOR_NUMBER",
"GIT_BRANCH",
"GIT_COMMIT",
"GIT_URL",
"JAVA_HOME",
"JENKINS_URL",
"JOB_NAME",
"NODE_NAME",
"SVN_REVISION",
"WORKSPACE",
]
# In addition, all variables with names starting with the following prefix are propagated.
PROPAGATED_ENV_VAR_PREFIX = 'YB_'
SPARK_URLS = {
'linux_default': os.getenv(
'YB_LINUX_PY3_SPARK_URL',
'spark://spark-for-yugabyte-linux-default.example.com:7077'),
'linux_asan_tsan': os.getenv(
'YB_ASAN_TSAN_PY3_SPARK_URL',
'spark://spark-for-yugabyte-linux-asan-tsan.example.com:7077'),
'macos': os.getenv(
'YB_MACOS_PY3_SPARK_URL',
'spark://spark-for-yugabyte-macos.example.com:7077'),
}
# This has to match what we output in run-test.sh if YB_LIST_CTEST_TESTS_ONLY is set.
CTEST_TEST_PROGRAM_RE = re.compile(r'^.* ctest test: \"(.*)\"$')
# Non-gtest tests and tests with internal dependencies that we should run in one shot. This almost
# duplicates a from common-test-env.sh, but that is probably OK since we should not be adding new
# such tests.
ONE_SHOT_TESTS = set([
'merge_test',
'c_test',
'compact_on_deletion_collector_test',
'db_sanity_test',
'tests-rocksdb/thread_local_test'])
HASH_COMMENT_RE = re.compile('#.*$')
# Global variables. Some of these are used on the remote worker side.
verbose = False
g_spark_master_url_override = None
propagated_env_vars: Dict[str, str] = {}
global_conf_dict = None
spark_context = None
archive_sha256sum = None
g_max_num_test_failures = sys.maxsize
def configure_logging() -> None:
log_level = logging.INFO
logging.basicConfig(
level=log_level,
format="[%(filename)s:%(lineno)d] %(asctime)s %(levelname)s: %(message)s")
def is_pid_running(pid: int) -> bool:
import psutil # type: ignore
try:
process = psutil.Process(pid)
return process.status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
# Not running.
return False
def delete_if_exists_log_errors(file_path: str) -> None:
if os.path.exists(file_path):
try:
if os.path.isdir(file_path):
subprocess.check_call(['rm', '-rf', file_path])
else:
os.remove(file_path)
except OSError as os_error:
logging.error("Error deleting file %s: %s", file_path, os_error)
def log_heading(msg: str) -> None:
logging.info('\n%s\n%s\n%s' % ('-' * 80, msg, '-' * 80))
# Initializes the spark context. The details list will be incorporated in the Spark application
# name visible in the Spark web UI.
def init_spark_context(details: List[str] = []) -> None:
global spark_context
if spark_context:
return
log_heading("Initializing Spark context")
global_conf = yb_dist_tests.get_global_conf()
build_type = global_conf.build_type
from pyspark import SparkContext # type: ignore
spark_master_url = g_spark_master_url_override
if spark_master_url is None:
if is_macos():
logging.info("This is macOS, using the macOS Spark cluster")
spark_master_url = SPARK_URLS['macos']
elif build_type in ['asan', 'tsan']:
logging.info("Using a separate Spark cluster for ASAN and TSAN tests")
spark_master_url = SPARK_URLS['linux_asan_tsan']
else:
logging.info("Using the regular Spark cluster for non-ASAN/TSAN tests")
spark_master_url = SPARK_URLS['linux_default']
logging.info("Spark master URL: %s", spark_master_url)
spark_master_url = os.environ.get('YB_SPARK_MASTER_URL', spark_master_url)
details += [
'user: {}'.format(getpass.getuser()),
'build type: {}'.format(build_type)
]
if 'BUILD_URL' in os.environ:
details.append('URL: {}'.format(os.environ['BUILD_URL']))
spark_context = SparkContext(spark_master_url, "YB tests: {}".format(' '.join(details)))
yb_python_zip_path = yb_dist_tests.get_tmp_filename(
prefix='yb_python_module_for_spark_workers_', suffix='.zip', auto_remove=True)
logging.info("Creating a zip archive with the yb python module at %s", yb_python_zip_path)
subprocess.check_call(
['zip', '--recurse-paths', '--quiet', yb_python_zip_path, 'yb',
'-x', '*.sw?', '-x', '*.pyc'],
cwd=os.path.join(global_conf.yb_src_root, 'python'))
spark_context.addPyFile(yb_python_zip_path)
if global_conf.archive_for_workers is not None:
logging.info("Will send the archive %s to all Spark workers",
global_conf.archive_for_workers)
spark_context.addFile(global_conf.archive_for_workers)
log_heading("Initialized Spark context")
def set_global_conf_for_spark_jobs() -> None:
global global_conf_dict
global_conf_dict = vars(yb_dist_tests.get_global_conf())
def get_bash_path() -> str:
if sys.platform == 'darwin':
return '/usr/local/bin/bash'
return '/bin/bash'
def copy_spark_stderr(test_descriptor_str: str, build_host: str) -> None:
"""
If the initialization or the test fails, copy the Spark worker stderr log back to build host.
:param test_descriptor_str: Test descriptor to figure out the correct name for the log file.
:param build_host: Host to which the log will be copied.
:return: None
"""
try:
from pyspark import SparkFiles # type: ignore
spark_stderr_src = os.path.join(os.path.abspath(SparkFiles.getRootDirectory()), 'stderr')
test_descriptor = yb_dist_tests.TestDescriptor(test_descriptor_str)
error_output_path = test_descriptor.error_output_path
spark_stderr_dest = error_output_path.replace('__error.log', '__spark_stderr.log')
error_log_dir_path = os.path.dirname(spark_stderr_dest)
if not os.path.isdir(error_log_dir_path):
subprocess.check_call(['mkdir', '-p', error_log_dir_path])
logging.info(f"Copying spark stderr {spark_stderr_src} to {spark_stderr_dest}")
shutil.copyfile(spark_stderr_src, spark_stderr_dest)
copy_to_host([spark_stderr_dest], build_host)
except Exception as e:
logging.exception("Error copying spark stderr log}")
def parallel_run_test(test_descriptor_str: str, fail_count: Any) -> yb_dist_tests.TestResult:
"""
This is invoked in parallel to actually run tests.
"""
try:
global_conf = initialize_remote_task()
except Exception as e:
build_host = os.environ.get('YB_BUILD_HOST', None)
if build_host:
copy_spark_stderr(test_descriptor_str, build_host)
raise e
from yb import yb_dist_tests
wait_for_path_to_exist(global_conf.build_root)
# Created files/directories will be writable by the group.
old_umask = os.umask(2)
test_descriptor = yb_dist_tests.TestDescriptor(test_descriptor_str)
# This is saved in the test result file by process_test_result.py.
os.environ['YB_TEST_DESCRIPTOR_STR'] = test_descriptor_str
os.environ['YB_TEST_ATTEMPT_INDEX'] = str(test_descriptor.attempt_index)
os.environ['build_type'] = global_conf.build_type
os.environ['YB_RUNNING_TEST_ON_SPARK'] = '1'
os.environ['BUILD_ROOT'] = global_conf.build_root
test_started_running_flag_file = yb_dist_tests.get_tmp_filename(
prefix='yb_test_started_running_flag_file')
os.environ['YB_TEST_STARTED_RUNNING_FLAG_FILE'] = test_started_running_flag_file
os.environ['YB_TEST_EXTRA_ERROR_LOG_PATH'] = test_descriptor.error_output_path
timestamp_str = datetime.now().strftime('%Y%m%d%H%M%S%f')
random_part = ''.join([str(random.randint(0, 9)) for i in range(10)])
test_tmp_dir = os.path.join(
os.environ.get('YB_TEST_TMP_BASE_DIR', '/tmp'),
f'yb_test.{test_descriptor.str_for_file_name()}.{timestamp_str}.{random_part}')
os.environ['TEST_TMPDIR'] = test_tmp_dir
artifact_list_path = yb_dist_tests.get_tmp_filename(
prefix='yb_test_artifact_list', suffix='.txt')
os.environ['YB_TEST_ARTIFACT_LIST_PATH'] = artifact_list_path
logging.info("Setting YB_TEST_ARTIFACT_LIST_PATH to %s", artifact_list_path)
# We could use "run_program" here, but it collects all the output in memory, which is not
# ideal for a large amount of test log output. The "tee" part also makes the output visible in
# the standard error of the Spark task as well, which is sometimes helpful for debugging.
def run_test() -> Tuple[int, float]:
start_time_sec = time.time()
error_log_dir_path = os.path.dirname(os.path.abspath(test_descriptor.error_output_path))
if not os.path.isdir(error_log_dir_path):
subprocess.check_call(['mkdir', '-p', error_log_dir_path])
runner_oneline = 'set -o pipefail; "%s" %s 2>&1 | tee "%s"; exit ${PIPESTATUS[0]}' % (
global_conf.get_run_test_script_path(),
test_descriptor.args_for_run_test,
test_descriptor.error_output_path
)
process = subprocess.Popen(
[get_bash_path(), '-c', runner_oneline]
)
found_flag_file = False
while is_pid_running(process.pid):
elapsed_time_sec = time.time() - start_time_sec
termination_reason = None
if elapsed_time_sec > TEST_TIMEOUT_UPPER_BOUND_SEC:
termination_reason = 'ran longer than %d seconds' % TEST_TIMEOUT_UPPER_BOUND_SEC
failed_to_launch = False
if not found_flag_file:
if os.path.exists(test_started_running_flag_file):
found_flag_file = True
elif elapsed_time_sec > TIME_SEC_TO_START_RUNNING_TEST:
termination_reason = (
'could not start running the test in %d seconds (file %s not created). '
'Ran command: {{ %s }}.'
) % (
TIME_SEC_TO_START_RUNNING_TEST,
test_started_running_flag_file,
runner_oneline
)
failed_to_launch = True
if termination_reason:
error_msg = "Test %s is being terminated (ran for %.1f seconds), reason: %s" % (
test_descriptor, elapsed_time_sec, termination_reason
)
logging.info(error_msg)
try:
os.kill(process.pid, signal.SIGKILL)
except OSError as os_error:
if os_error.errno == errno.ESRCH:
logging.info(
"Process with pid %d disappeared suddenly, that's OK",
process.pid)
pass
raise os_error
if failed_to_launch:
# This exception should bubble up to Spark and cause it to hopefully re-run the
# test on some other node.
raise RuntimeError(error_msg)
break
time.sleep(0.5)
exit_code = process.wait()
elapsed_time_sec = time.time() - start_time_sec
logging.info("Test %s ran on %s in %.1f seconds, rc=%d",
test_descriptor, socket.gethostname(), elapsed_time_sec, exit_code)
if exit_code != 0:
fail_count.add(1)
return exit_code, elapsed_time_sec
# End of the local run_test() function.
try:
exit_code, elapsed_time_sec = run_test()
error_output_path = test_descriptor.error_output_path
failed_without_output = False
if os.path.isfile(error_output_path) and os.path.getsize(error_output_path) == 0:
# Empty error output file (<something>__error.log).
if exit_code == 0:
# Test succeeded, no error output.
os.remove(error_output_path)
else:
failed_without_output = True
artifact_paths = []
num_errors_copying_artifacts = 0
rel_artifact_paths = None
if global_conf.archive_for_workers:
artifact_paths = [error_output_path]
if os.path.exists(artifact_list_path):
with open(artifact_list_path) as artifact_list_file:
for artifact_path_pattern in artifact_list_file:
artifact_path_pattern = artifact_path_pattern.strip()
if not artifact_path_pattern:
continue
logging.info("Artifact pattern to copy to main build host: '%s'",
artifact_path_pattern)
glob_result = glob.glob(os.path.abspath(artifact_path_pattern))
artifact_paths.extend(glob_result)
if not glob_result:
logging.warning("No artifacts found for pattern: '%s'",
artifact_path_pattern)
else:
logging.warning("Artifact list does not exist: '%s'", artifact_list_path)
build_host = os.environ.get('YB_BUILD_HOST')
assert build_host is not None
num_errors_copying_artifacts = copy_to_host(artifact_paths, build_host)
if exit_code != 0:
copy_spark_stderr(test_descriptor_str, build_host)
rel_artifact_paths = [
os.path.relpath(os.path.abspath(artifact_path), global_conf.yb_src_root)
for artifact_path in artifact_paths
]
return yb_dist_tests.TestResult(
exit_code=exit_code,
test_descriptor=test_descriptor,
elapsed_time_sec=elapsed_time_sec,
failed_without_output=failed_without_output,
artifact_paths=rel_artifact_paths,
num_errors_copying_artifacts=num_errors_copying_artifacts)
finally:
delete_if_exists_log_errors(test_tmp_dir)
delete_if_exists_log_errors(test_started_running_flag_file)
delete_if_exists_log_errors(artifact_list_path)
os.umask(old_umask)
def get_bash_shebang() -> str:
# Prefer /usr/local/bin/bash as we install Bash 4+ there on macOS.
if os.path.exists('/usr/local/bin/bash'):
return '/usr/local/bin/bash'
return '/usr/bin/env bash'
# This is executed on a Spark executor as part of running a task.
def initialize_remote_task() -> yb_dist_tests.GlobalTestConfig:
configure_logging()
assert global_conf_dict is not None
global_conf = yb_dist_tests.set_global_conf_from_dict(global_conf_dict)
global_conf.set_env_on_spark_worker(propagated_env_vars)
if not global_conf.archive_for_workers:
return global_conf
from pyspark import SparkFiles # type: ignore
archive_name = os.path.basename(SparkFiles.get(global_conf.archive_for_workers))
expected_archive_sha256sum = global_conf.archive_sha256sum
assert expected_archive_sha256sum is not None
worker_tmp_dir = os.path.abspath(SparkFiles.getRootDirectory())
archive_path = os.path.join(worker_tmp_dir, archive_name)
if not os.path.exists(archive_path):
raise IOError("Archive not found: %s" % archive_path)
# We install the code into the same path where it was installed on the main build node (Jenkins
# worker or dev server), but put it in as separate variable to have flexibility to change it
# later.
remote_yb_src_root = global_conf.yb_src_root
remote_yb_src_job_dir = os.path.dirname(remote_yb_src_root)
# Job clean-up directory should be common on spark worker node for all jobs and should be on
# same filesystem as yb_src_root for efficient two-stage clean-up.
remote_yb_src_removal = os.environ.get('YB_SPARK_CLEAN_DIR', '/tmp/SPARK_TO_REMOVE')
try:
subprocess.check_call([
'mkdir',
'-p',
remote_yb_src_removal,
remote_yb_src_job_dir])
untar_script_path = os.path.join(
worker_tmp_dir, 'untar_archive_once_%d.sh' % random.randint(0, 2**64))
# We also copy the temporary script here for later reference.
untar_script_path_for_reference = os.path.join(
worker_tmp_dir, 'untar_archive_once.sh')
lock_path = '/tmp/yb_dist_tests_update_archive%s.lock' % (
global_conf.yb_src_root.replace('/', '__'))
bash_shebang = get_bash_shebang()
with open(untar_script_path, 'w') as untar_script_file:
# Do the locking using the flock command in Bash -- file locking in Python is painful.
# Some curly braces in the script template are escaped as "{{" and }}".
untar_script_file.write("""#!{bash_shebang}
set -euo pipefail
(
PATH=/usr/local/bin:$PATH
# Options for asynchronous clean-up jobs.
# Optional argument for checking workspace path associated with this spark app.
if [[ "${{1:-none}}" == "path" ]]; then
echo '{remote_yb_src_root}'
exit 0
fi
# Optional argument for marking workspace to be removed.
if [[ "${{1:-none}}" == "remove" ]]; then
rm -f '{archive_path}'
mv '{remote_yb_src_job_dir}' '{remote_yb_src_removal}/'
exit 0
fi
flock -w 60 200
# Clean up any pending removals before we unpack the archive.
rm -rf {remote_yb_src_removal}/*
# Check existing workspace.
if [[ -d '{remote_yb_src_root}' ]]; then
previous_sha256_file_path='{remote_yb_src_root}/extracted_from_archive.sha256'
if [[ ! -f $previous_sha256_file_path ]]; then
# Prevent accidental deletion of directories that were not installed by untarring
# an archive.
echo "File $previous_sha256_file_path does not exist!" >&2
exit 1
fi
previous_sha256sum=$(<"$previous_sha256_file_path")
if [[ $previous_sha256sum == '{expected_archive_sha256sum}' ]]; then
echo "Found existing archive installation at '{remote_yb_src_root}' with correct" \
"expected checksum '$previous_sha256sum'."
else
echo "Removing '{remote_yb_src_root}': it was installed from archive with checksum" \
"'$previous_sha256sum' but we are installing one with checksum" \
"'{expected_archive_sha256sum}'."
rm -rf '{remote_yb_src_root}'
fi
fi
if [[ ! -d '{remote_yb_src_root}' ]]; then
if [[ ! -f '{untar_script_path_for_reference}' ]]; then
cp '{untar_script_path}' '{untar_script_path_for_reference}'
fi
actual_archive_sha256sum=$( (
[[ $OSTYPE == linux* ]] && sha256sum '{archive_path}' ||
shasum --algorithm 256 '{archive_path}'
) | awk '{{ print $1 }}' )
if [[ $actual_archive_sha256sum != '{expected_archive_sha256sum}' ]]; then
echo "Archive SHA256 sum of '{archive_path}' is $actual_archive_sha256sum, which" \
"does not match expected value: {expected_archive_sha256sum}." >&2
exit 1
fi
chmod 0755 '{untar_script_path_for_reference}'
yb_src_root_extract_tmp_dir='{remote_yb_src_root}'.$RANDOM.$RANDOM.$RANDOM.$RANDOM
mkdir -p "$yb_src_root_extract_tmp_dir"
if [[ -x /bin/pigz ]]; then
# Decompress faster with pigz
/bin/pigz -dc '{archive_path}' | tar xf - -C "$yb_src_root_extract_tmp_dir"
else
tar xzf '{archive_path}' -C "$yb_src_root_extract_tmp_dir"
fi
echo '{expected_archive_sha256sum}' \
>"$yb_src_root_extract_tmp_dir/extracted_from_archive.sha256"
mv "$yb_src_root_extract_tmp_dir" '{remote_yb_src_root}'
fi
) 200>'{lock_path}'
""".format(**locals()))
os.chmod(untar_script_path, 0o755)
subprocess.check_call(untar_script_path)
except subprocess.CalledProcessError as e:
logging.exception(f"Error initializing the remote task:\n"
f"STDOUT: {e.stdout}\n"
f"STDERR: {e.stderr}")
raise e
finally:
if os.path.exists(untar_script_path):
os.remove(untar_script_path)
return global_conf
def parallel_list_test_descriptors(rel_test_path: str) -> List[str]:
"""
This is invoked in parallel to list all individual tests within our C++ test programs. Without
this, listing all gtest tests across 330 test programs might take about 5 minutes on TSAN and 2
minutes in debug.
"""
from yb import yb_dist_tests, command_util
global_conf = initialize_remote_task()
os.environ['BUILD_ROOT'] = global_conf.build_root
find_or_download_thirdparty_script_path = os.path.join(
global_conf.yb_src_root, 'build-support', 'find_or_download_thirdparty.sh')
subprocess.check_call(find_or_download_thirdparty_script_path)
wait_for_path_to_exist(global_conf.build_root)
list_tests_cmd_line = [
os.path.join(global_conf.build_root, rel_test_path), '--gtest_list_tests']
try:
prog_result = command_util.run_program(list_tests_cmd_line)
except OSError as ex:
logging.error("Failed running the command: %s", list_tests_cmd_line)
raise
# --gtest_list_tests gives us the following output format:
# TestSplitArgs.
# Simple
# SimpleWithSpaces
# SimpleWithQuotes
# BadWithQuotes
# Empty
# Error
# BloomFilterReverseCompatibility
# BloomFilterWrapper
# PrefixExtractorFullFilter
# PrefixExtractorBlockFilter
# PrefixScan
# OptimizeFiltersForHits
# BloomStatsTestWithParam/BloomStatsTestWithParam.
# BloomStatsTest/0 # GetParam() = (true, true)
# BloomStatsTest/1 # GetParam() = (true, false)
# BloomStatsTest/2 # GetParam() = (false, false)
# BloomStatsTestWithIter/0 # GetParam() = (true, true)
# BloomStatsTestWithIter/1 # GetParam() = (true, false)
# BloomStatsTestWithIter/2 # GetParam() = (false, false)
current_test: Optional[str] = None
test_descriptors: List[str] = []
test_descriptor_prefix = rel_test_path + yb_dist_tests.TEST_DESCRIPTOR_SEPARATOR
for line in prog_result.stdout.split("\n"):
if ('Starting tracking the heap' in line or 'Dumping heap profile to' in line):
continue
line = line.rstrip()
trimmed_line = HASH_COMMENT_RE.sub('', line.strip()).strip()
if line.startswith(' '):
assert current_test is not None
test_descriptors.append(test_descriptor_prefix + current_test + trimmed_line)
else:
current_test = trimmed_line
return test_descriptors
def get_username() -> str:
try:
return os.getlogin()
except OSError as ex:
logging.warning(("Got an OSError trying to get the current user name, " +
"trying a workaround: {}").format(ex))
# https://github.com/gitpython-developers/gitpython/issues/39
try:
return pwd.getpwuid(os.getuid()).pw_name
except KeyError as ex:
user_from_env = os.getenv('USER')
if user_from_env:
return user_from_env
id_output = subprocess.check_output('id').strip().decode('utf-8')
ID_OUTPUT_RE = re.compile(r'^uid=\d+[(]([^)]+)[)]\s.*')
match = ID_OUTPUT_RE.match(id_output)
if match:
return match.group(1)
logging.warning(
"Could not get user name from the environment, and could not parse 'id' output: %s",
id_output)
raise ex
def get_mac_shared_nfs(path: str) -> str:
LOCAL_PATH = "/Volumes/share"
if not path.startswith(LOCAL_PATH):
raise ValueError("Local path %s does not start with expected prefix '%s'.\n" %
(path, LOCAL_PATH))
relpath = path[len(LOCAL_PATH):]
yb_build_host = os.environ.get('YB_BUILD_HOST')
if yb_build_host is None:
raise ValueError("The YB_BUILD_HOST environment variable is not set")
return "/Volumes/net/v1/" + yb_build_host + relpath
def copy_to_host(artifact_paths: List[str], build_host: str) -> int:
"""
Provide compatibility to copy artifacts back to build host via NFS or SSH.
"""
num_errors_copying_artifacts = 0
if is_macos() and socket.gethostname() == build_host:
logging.info("Files already local to build host. Skipping artifact copy.")
else:
ssh_mode = True if os.getenv('YB_SPARK_COPY_MODE') == 'SSH' else False
num_artifacts_copied = 0
artifact_size_limit = int(os.getenv('YB_SPARK_MAX_ARTIFACT_SIZE_BYTES',
MAX_ARTIFACT_SIZE_BYTES))
for artifact_path in artifact_paths:
if not os.path.exists(artifact_path):
logging.warning("Build artifact file does not exist: '%s'", artifact_path)
continue
artifact_size = os.path.getsize(artifact_path)
if artifact_size > artifact_size_limit:
logging.warning(
"Build artifact file {} of size {} bytes exceeds max limit of {} bytes".format(
artifact_path, os.path.getsize(artifact_path), artifact_size_limit)
)
continue
if ssh_mode:
dest_dir = os.path.dirname(artifact_path)
logging.info(f"Copying {artifact_path} to {build_host}:{dest_dir}")
else:
if is_macos():
dest_path = get_mac_shared_nfs(artifact_path)
else:
dest_path = yb_dist_tests.to_real_nfs_path(artifact_path)
dest_dir = os.path.dirname(dest_path)
logging.info(f"Copying {artifact_path} to {dest_dir}")
try:
if ssh_mode:
subprocess.check_call(['ssh', build_host, 'mkdir', '-p', dest_dir])
subprocess.check_call(['scp', artifact_path, f'{build_host}:{dest_dir}/'])
else:
subprocess.check_call(['mkdir', '-p', dest_dir])
subprocess.check_call(['cp', '-f', artifact_path, dest_path])
except subprocess.CalledProcessError as ex:
logging.error("Error copying %s to %s: %s", artifact_path, dest_dir, ex)
num_errors_copying_artifacts += 1
num_artifacts_copied += 1
logging.info("Number of build artifact files copied: %d", num_artifacts_copied)
return num_errors_copying_artifacts
def get_jenkins_job_name() -> Optional[str]:
return os.environ.get('JOB_NAME')
def get_jenkins_job_name_path_component() -> str:
jenkins_job_name = get_jenkins_job_name()
if jenkins_job_name:
return "job_" + jenkins_job_name
return "unknown_jenkins_job"
def get_report_parent_dir(report_base_dir: str) -> str:
"""
@return a directory to store build report, relative to the given base directory. Path components
are based on build type, Jenkins job name, etc.
"""
global_conf = yb_dist_tests.get_global_conf()
return os.path.join(
report_base_dir,
global_conf.build_type,
get_jenkins_job_name_path_component())
def save_json_to_paths(
short_description: str,
json_data: Any,
output_paths: List[str],
should_gzip: bool = False) -> None:
"""
Saves the given JSON-friendly data structure to the list of paths (exact copy at each path),
optionally gzipping the output.
"""
json_data_str = json.dumps(json_data, sort_keys=True, indent=2) + "\n"
for output_path in output_paths:
if output_path is None:
continue
assert output_path.endswith('.json'), \
"Expected output path to end with .json: {}".format(output_path)
final_output_path = output_path + ('.gz' if should_gzip else '')
logging.info("Saving {} to {}".format(short_description, final_output_path))
if should_gzip:
with gzip.open(final_output_path, 'wb') as output_file_plain:
output_file_plain.write(json_data_str.encode('utf-8'))
else:
with open(final_output_path, 'w') as output_file_gzip:
output_file_gzip.write(json_data_str)
def save_report(
report_base_dir: str,
results: List[yb_dist_tests.TestResult],
total_elapsed_time_sec: float,
spark_succeeded: bool,
save_to_build_dir: bool = False) -> None:
historical_report_path = None
global_conf = yb_dist_tests.get_global_conf()
if report_base_dir:
historical_report_parent_dir = get_report_parent_dir(report_base_dir)
if not os.path.isdir(historical_report_parent_dir):
try:
os.makedirs(historical_report_parent_dir)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(historical_report_parent_dir):
pass
raise
try:
username = get_username()
except: # noqa
logging.error("Could not get username, using 'unknown_user':\n%s",
traceback.format_exc())
username = "unknown_user"
historical_report_path = os.path.join(
historical_report_parent_dir,
'{}.json'.format('_'.join([
global_conf.build_type,
time.strftime('%Y-%m-%dT%H_%M_%S'),
username,
get_jenkins_job_name_path_component(),
os.environ.get('BUILD_ID', 'unknown')])))
test_reports_by_descriptor = {}
for result in results:
test_descriptor = result.test_descriptor
test_report_dict = dict(
elapsed_time_sec=result.elapsed_time_sec,
exit_code=result.exit_code,
language=test_descriptor.language,
artifact_paths=result.artifact_paths
)
test_reports_by_descriptor[test_descriptor.descriptor_str] = test_report_dict
if test_descriptor.error_output_path and os.path.isfile(test_descriptor.error_output_path):
test_report_dict['error_output_path'] = test_descriptor.error_output_path
jenkins_env_var_values = {}
for jenkins_env_var_name in JENKINS_ENV_VARS:
if jenkins_env_var_name in os.environ:
jenkins_env_var_values[jenkins_env_var_name] = os.environ[jenkins_env_var_name]
report = dict(
conf=vars(yb_dist_tests.global_conf),
total_elapsed_time_sec=total_elapsed_time_sec,
spark_succeeded=spark_succeeded,
jenkins_env_vars=jenkins_env_var_values,
tests=test_reports_by_descriptor
)
full_report_paths = []
if historical_report_path:
full_report_paths.append(historical_report_path)
if save_to_build_dir:
full_report_paths.append(os.path.join(global_conf.build_root, 'full_build_report.json'))
save_json_to_paths('full build report', report, full_report_paths, should_gzip=True)
if save_to_build_dir:
del report['tests']
short_report_path = os.path.join(global_conf.build_root, 'short_build_report.json')
save_json_to_paths('short build report', report, [short_report_path], should_gzip=False)
def is_one_shot_test(rel_binary_path: str) -> bool:
if rel_binary_path in ONE_SHOT_TESTS:
return True
for non_gtest_test in ONE_SHOT_TESTS:
if rel_binary_path.endswith('/' + non_gtest_test):
return True
return False
def collect_cpp_tests(
cpp_test_program_filter_list: List[str]) -> List[yb_dist_tests.TestDescriptor]:
"""
Collect C++ test programs to run.
@param cpp_test_program_filter_list: a list of C++ test program names to be used as a filter
"""
global_conf = yb_dist_tests.get_global_conf()
logging.info("Collecting the list of C++ test programs (locally; not a Spark job)")
start_time_sec = time.time()
build_root_realpath = os.path.realpath(global_conf.build_root)
ctest_cmd_result = command_util.run_program(
['/bin/bash',
'-c',
'cd "{}" && YB_LIST_CTEST_TESTS_ONLY=1 ctest -j8 --verbose'.format(
build_root_realpath)])
test_programs = []
for line in ctest_cmd_result.stdout.split("\n"):
re_match = CTEST_TEST_PROGRAM_RE.match(line)
if re_match:
ctest_test_program = re_match.group(1)
if ctest_test_program.startswith('/'):
ctest_test_program = os.path.realpath(ctest_test_program)
rel_ctest_prog_path = os.path.relpath(
os.path.realpath(ctest_test_program),
build_root_realpath)
if rel_ctest_prog_path.startswith('../'):
raise ValueError(
"Relative path to a ctest test binary ended up starting with '../', something "
"must be wrong: %s" % rel_ctest_prog_path)
test_programs.append(rel_ctest_prog_path)
test_programs = sorted(set(test_programs))
elapsed_time_sec = time.time() - start_time_sec
logging.info("Collected %d test programs in %.2f sec" % (
len(test_programs), elapsed_time_sec))
if cpp_test_program_filter_list:
cpp_test_program_filter = set(cpp_test_program_filter_list)
unfiltered_test_programs = test_programs
# test_program contains test paths relative to the root directory (including directory
# names), and cpp_test_program_filter contains basenames only.
test_programs = sorted(set([
test_program for test_program in test_programs
if os.path.basename(test_program) in cpp_test_program_filter
]))
logging.info("Filtered down to %d test programs using the list from test conf file" %
len(test_programs))
if unfiltered_test_programs and not test_programs:
# This means we've filtered the list of C++ test programs down to an empty set.
logging.info(
("NO MATCHING C++ TEST PROGRAMS FOUND! Test programs from conf file: {}, "
"collected from ctest before filtering: {}").format(
set_to_comma_sep_str(cpp_test_program_filter),
set_to_comma_sep_str(unfiltered_test_programs)))
if not test_programs:
logging.info("Found no test programs")
return []
fine_granularity_gtest_programs = []
one_shot_test_programs = []
for test_program in test_programs:
if is_one_shot_test(test_program):
one_shot_test_programs.append(test_program)
else:
fine_granularity_gtest_programs.append(test_program)
logging.info(("Found {} gtest test programs where tests will be run separately, "
"{} test programs to be run on one shot").format(
len(fine_granularity_gtest_programs),
len(one_shot_test_programs)))
test_programs = fine_granularity_gtest_programs
logging.info(
"Collecting gtest tests for {} test programs where tests will be run separately".format(
len(test_programs)))
start_time_sec = time.time()
all_test_programs = fine_granularity_gtest_programs + one_shot_test_programs
if len(all_test_programs) <= 5: