Skip to content

Commit a1c44f6

Browse files
tdasCodingCat
authored andcommitted
[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the Kinesis assembly JAR has not been generated
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#7961 from tdas/SPARK-9640 and squashes the following commits: 974ce19 [Tathagata Das] Undo changes related to SPARK-9727 004ae26 [Tathagata Das] style fixes 9bbb97d [Tathagata Das] Minor style fies e6a677e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9640 ca90719 [Tathagata Das] Removed extra line ba9cfc7 [Tathagata Das] Improved kinesis test selection logic 88d59bd [Tathagata Das] updated test modules 871fcc8 [Tathagata Das] Fixed SparkBuild 94be631 [Tathagata Das] Fixed style b858196 [Tathagata Das] Fixed conditions and few other things based on PR comments. e292e64 [Tathagata Das] Added filters for Kinesis python tests
1 parent 52c214d commit a1c44f6

File tree

1 file changed

+44
-12
lines changed

1 file changed

+44
-12
lines changed

python/pyspark/streaming/tests.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -971,8 +971,10 @@ def test_kinesis_stream_api(self):
971971
"awsAccessKey", "awsSecretKey")
972972

973973
def test_kinesis_stream(self):
974-
if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
975-
print("Skip test_kinesis_stream")
974+
if not are_kinesis_tests_enabled:
975+
sys.stderr.write(
976+
"Skipped test_kinesis_stream (enable by setting environment variable %s=1"
977+
% kinesis_test_environ_var)
976978
return
977979

978980
import random
@@ -1013,6 +1015,7 @@ def get_output(_, rdd):
10131015
traceback.print_exc()
10141016
raise
10151017
finally:
1018+
self.ssc.stop(False)
10161019
kinesisTestUtils.deleteStream()
10171020
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
10181021

@@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar():
10271030
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
10281031
"You need to build Spark with "
10291032
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
1030-
"'build/mvn package' before running this test")
1033+
"'build/mvn package' before running this test.")
10311034
elif len(jars) > 1:
10321035
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
10331036
"remove all but one") % kafka_assembly_dir)
@@ -1045,7 +1048,7 @@ def search_flume_assembly_jar():
10451048
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
10461049
"You need to build Spark with "
10471050
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
1048-
"'build/mvn package' before running this test")
1051+
"'build/mvn package' before running this test.")
10491052
elif len(jars) > 1:
10501053
raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
10511054
"remove all but one") % flume_assembly_dir)
@@ -1095,27 +1098,56 @@ def search_kinesis_asl_assembly_jar():
10951098
os.path.join(kinesis_asl_assembly_dir,
10961099
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
10971100
if not jars:
1098-
raise Exception(
1099-
("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
1100-
kinesis_asl_assembly_dir) + "You need to build Spark with "
1101-
"'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
1102-
"or 'build/mvn -Pkinesis-asl package' before running this test")
1101+
return None
11031102
elif len(jars) > 1:
11041103
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
11051104
"remove all but one") % kinesis_asl_assembly_dir)
11061105
else:
11071106
return jars[0]
11081107

11091108

1109+
# Must be same as the variable and condition defined in KinesisTestUtils.scala
1110+
kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
1111+
are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
1112+
11101113
if __name__ == "__main__":
11111114
kafka_assembly_jar = search_kafka_assembly_jar()
11121115
flume_assembly_jar = search_flume_assembly_jar()
11131116
mqtt_assembly_jar = search_mqtt_assembly_jar()
11141117
mqtt_test_jar = search_mqtt_test_jar()
11151118
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
11161119

1117-
jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar,
1118-
mqtt_assembly_jar, mqtt_test_jar)
1120+
if kinesis_asl_assembly_jar is None:
1121+
kinesis_jar_present = False
1122+
jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
1123+
mqtt_test_jar)
1124+
else:
1125+
kinesis_jar_present = True
1126+
jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
1127+
mqtt_test_jar, kinesis_asl_assembly_jar)
11191128

11201129
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
1121-
unittest.main()
1130+
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
1131+
CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]
1132+
1133+
if kinesis_jar_present is True:
1134+
testcases.append(KinesisStreamTests)
1135+
elif are_kinesis_tests_enabled is False:
1136+
sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was "
1137+
"not compiled with -Pkinesis-asl profile. To run these tests, "
1138+
"you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly "
1139+
"streaming-kinesis-asl-assembly/assembly' or "
1140+
"'build/mvn -Pkinesis-asl package' before running this test.")
1141+
else:
1142+
raise Exception(
1143+
("Failed to find Spark Streaming Kinesis assembly jar in %s. "
1144+
% kinesis_asl_assembly_dir) +
1145+
"You need to build Spark with 'build/sbt -Pkinesis-asl "
1146+
"assembly/assembly streaming-kinesis-asl-assembly/assembly'"
1147+
"or 'build/mvn -Pkinesis-asl package' before running this test.")
1148+
1149+
sys.stderr.write("Running tests: %s \n" % (str(testcases)))
1150+
for testcase in testcases:
1151+
sys.stderr.write("[Running %s]\n" % (testcase))
1152+
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
1153+
unittest.TextTestRunner(verbosity=2).run(tests)

0 commit comments

Comments
 (0)