@@ -827,6 +827,7 @@ def test_flume_polling(self):
827827 def test_flume_polling_multiple_hosts (self ):
828828 self ._testMultipleTimes (self ._testFlumePollingMultipleHosts )
829829
830+
830831class MQTTStreamTests (PySparkStreamingTestCase ):
831832 timeout = 20 # seconds
832833 duration = 1
@@ -841,8 +842,8 @@ def setUp(self):
841842
842843 def tearDown (self ):
843844 if self ._MQTTTestUtils is not None :
844- self ._MQTTTestUtils .teardown ()
845- self ._MQTTTestUtils = None
845+ self ._MQTTTestUtils .teardown ()
846+ self ._MQTTTestUtils = None
846847
847848 super (MQTTStreamTests , self ).tearDown ()
848849
@@ -905,10 +906,11 @@ def search_flume_assembly_jar():
905906 "'build/mvn package' before running this test" )
906907 elif len (jars ) > 1 :
907908 raise Exception (("Found multiple Spark Streaming Flume assembly JARs in %s; please "
908- "remove all but one" ) % flume_assembly_dir )
909+ "remove all but one" ) % flume_assembly_dir )
909910 else :
910911 return jars [0 ]
911912
913+
912914def search_mqtt_assembly_jar ():
913915 SPARK_HOME = os .environ ["SPARK_HOME" ]
914916 mqtt_assembly_dir = os .path .join (SPARK_HOME , "external/mqtt-assembly" )
@@ -926,6 +928,7 @@ def search_mqtt_assembly_jar():
926928 else :
927929 return jars [0 ]
928930
931+
929932if __name__ == "__main__" :
930933 kafka_assembly_jar = search_kafka_assembly_jar ()
931934 flume_assembly_jar = search_flume_assembly_jar ()
0 commit comments