@@ -908,8 +908,10 @@ def test_kinesis_stream_api(self):
908908 "awsAccessKey" , "awsSecretKey" )
909909
910910 def test_kinesis_stream (self ):
911- if os .environ .get ('ENABLE_KINESIS_TESTS' ) != '1' :
912- print ("Skip test_kinesis_stream" )
911+ if not are_kinesis_tests_enabled :
912+ sys .stderr .write (
913+ "Skipped test_kinesis_stream (enable by setting environment variable %s=1"
914+ % kinesis_test_environ_var )
913915 return
914916
915917 import random
@@ -950,6 +952,7 @@ def get_output(_, rdd):
950952 traceback .print_exc ()
951953 raise
952954 finally :
955+ self .ssc .stop (False )
953956 kinesisTestUtils .deleteStream ()
954957 kinesisTestUtils .deleteDynamoDBTable (kinesisAppName )
955958
@@ -964,7 +967,7 @@ def search_kafka_assembly_jar():
964967 ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir ) +
965968 "You need to build Spark with "
966969 "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
967- "'build/mvn package' before running this test" )
970+ "'build/mvn package' before running this test. " )
968971 elif len (jars ) > 1 :
969972 raise Exception (("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
970973 "remove all but one" ) % kafka_assembly_dir )
@@ -982,7 +985,7 @@ def search_flume_assembly_jar():
982985 ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir ) +
983986 "You need to build Spark with "
984987 "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
985- "'build/mvn package' before running this test" )
988+ "'build/mvn package' before running this test. " )
986989 elif len (jars ) > 1 :
987990 raise Exception (("Found multiple Spark Streaming Flume assembly JARs in %s; please "
988991 "remove all but one" ) % flume_assembly_dir )
@@ -997,14 +1000,26 @@ def search_kinesis_asl_assembly_jar():
9971000 os .path .join (kinesis_asl_assembly_dir ,
9981001 "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar" ))
9991002 if not jars :
1000- return None
1003+ if are_kinesis_tests_enabled :
1004+ raise Exception (
1005+ ("Failed to find Spark Streaming Kinesis assembly jar in %s. " %
1006+ kinesis_asl_assembly_dir ) + "You need to build Spark with 'build/sbt -Pkinesis-asl "
1007+ "assembly/assembly streaming-kinesis-asl-assembly/assembly'"
1008+ "or 'build/mvn -Pkinesis-asl package' before running this test." )
1009+ else :
1010+ return None
10011011 elif len (jars ) > 1 :
10021012 raise Exception (("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please "
10031013 "remove all but one" ) % kinesis_asl_assembly_dir )
10041014 else :
10051015 return jars [0 ]
10061016
10071017
1018+ # Must be same as the variable and condition defined in KinesisTestUtils.scala
1019+ kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
1020+ are_kinesis_tests_enabled = os .environ .get (kinesis_test_environ_var ) == '1'
1021+
1022+
10081023if __name__ == "__main__" :
10091024 kafka_assembly_jar = search_kafka_assembly_jar ()
10101025 flume_assembly_jar = search_flume_assembly_jar ()
@@ -1016,15 +1031,16 @@ def search_kinesis_asl_assembly_jar():
10161031 kinesis_jar_present = True
10171032 jars = "%s,%s,%s" % (kafka_assembly_jar , flume_assembly_jar , kinesis_asl_assembly_jar )
10181033
1019- print kinesis_jar_present , jars
10201034 os .environ ["PYSPARK_SUBMIT_ARGS" ] = "--jars %s pyspark-shell" % jars
10211035 testcases = [BasicOperationTests , WindowFunctionTests , StreamingContextTests , \
10221036 CheckpointTests , KafkaStreamTests , FlumeStreamTests , FlumePollingStreamTests ]
10231037 if kinesis_jar_present is True :
10241038 testcases .append (KinesisStreamTests )
1039+ else :
1040+ sys .stderr .write ("Skipping all Kinesis Python tests as the "
1041+ "optional Kinesis project was not compiled" )
10251042
1026- sys .stderr .write ("Running tests %s\n " % (str (testcases )))
10271043 for testcase in testcases :
1028- print "[" , testcase , "]"
1044+ sys . stderr . write ( "[Running %s]" % ( testcase ))
10291045 tests = unittest .TestLoader ().loadTestsFromTestCase (testcase )
10301046 unittest .TextTestRunner (verbosity = 2 ).run (tests )
0 commit comments