Skip to content
56 changes: 44 additions & 12 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,10 @@ def test_kinesis_stream_api(self):
"awsAccessKey", "awsSecretKey")

def test_kinesis_stream(self):
if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
print("Skip test_kinesis_stream")
if not are_kinesis_tests_enabled:
sys.stderr.write(
"Skipped test_kinesis_stream (enable by setting environment variable %s=1"
% kinesis_test_environ_var)
return

import random
Expand Down Expand Up @@ -1013,6 +1015,7 @@ def get_output(_, rdd):
traceback.print_exc()
raise
finally:
self.ssc.stop(False)
kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)

Expand All @@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
"'build/mvn package' before running this test")
"'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
"remove all but one") % kafka_assembly_dir)
Expand All @@ -1045,7 +1048,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
"'build/mvn package' before running this test")
"'build/mvn package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please "
"remove all but one") % flume_assembly_dir)
Expand Down Expand Up @@ -1095,27 +1098,56 @@ def search_kinesis_asl_assembly_jar():
os.path.join(kinesis_asl_assembly_dir,
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
if not jars:
raise Exception(
("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " %
kinesis_asl_assembly_dir) + "You need to build Spark with "
"'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' "
"or 'build/mvn -Pkinesis-asl package' before running this test")
return None
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
"remove all but one") % kinesis_asl_assembly_dir)
else:
return jars[0]


# Must be same as the variable and condition defined in KinesisTestUtils.scala
kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'

if __name__ == "__main__":
kafka_assembly_jar = search_kafka_assembly_jar()
flume_assembly_jar = search_flume_assembly_jar()
mqtt_assembly_jar = search_mqtt_assembly_jar()
mqtt_test_jar = search_mqtt_test_jar()
kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()

jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar,
mqtt_assembly_jar, mqtt_test_jar)
if kinesis_asl_assembly_jar is None:
kinesis_jar_present = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ENABLE_KINESIS_TESTS is set to 1 but kinesis_jar_present is false, I think we should fail the test because it should be a bug that the assembly jar cannot be generated correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. If that flag is set, then the use definitely wants to run ALL the kinesis tests. In that case its best to fail.

jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
mqtt_test_jar)
else:
kinesis_jar_present = True
jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar,
mqtt_test_jar, kinesis_asl_assembly_jar)

os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
unittest.main()
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests,
CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests]

if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
elif are_kinesis_tests_enabled is False:
sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was "
"not compiled with -Pkinesis-asl profile. To run these tests, "
"you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly "
"streaming-kinesis-asl-assembly/assembly' or "
"'build/mvn -Pkinesis-asl package' before running this test.")
else:
raise Exception(
("Failed to find Spark Streaming Kinesis assembly jar in %s. "
% kinesis_asl_assembly_dir) +
"You need to build Spark with 'build/sbt -Pkinesis-asl "
"assembly/assembly streaming-kinesis-asl-assembly/assembly'"
"or 'build/mvn -Pkinesis-asl package' before running this test.")

sys.stderr.write("Running tests: %s \n" % (str(testcases)))
for testcase in testcases:
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
unittest.TextTestRunner(verbosity=2).run(tests)