Skip to content

Commit 87fc677

Browse files
author
Prabeesh K
committed
address the comments:
keep the whole MQTTTestUtils in test and then link to test jar from python fix issue under Maven build return JavaDStream[String] directly.
1 parent 97244ec commit 87fc677

File tree

6 files changed

+55
-24
lines changed

6 files changed

+55
-24
lines changed

dev/run-tests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,8 @@ def build_spark_sbt(hadoop_version):
295295
"assembly/assembly",
296296
"streaming-kafka-assembly/assembly",
297297
"streaming-flume-assembly/assembly",
298-
"streaming-mqtt-assembly/assembly"]
298+
"streaming-mqtt-assembly/assembly",
299+
"streaming-mqtt/test:assembly"]
299300
profiles_and_goals = build_profiles + sbt_goals
300301

301302
print("[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments: ",

external/mqtt-assembly/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<artifactId>maven-shade-plugin</artifactId>
5959
<configuration>
6060
<shadedArtifactAttached>false</shadedArtifactAttached>
61+
<outputFile>${project.build.directory}/scala-${scala.binary.version}/spark-streaming-mqtt-assembly-${project.version}.jar</outputFile>
6162
<artifactSet>
6263
<includes>
6364
<include>*:*</include>

external/mqtt/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
<groupId>org.apache.activemq</groupId>
7373
<artifactId>activemq-core</artifactId>
7474
<version>5.7.0</version>
75+
<scope>test</scope>
7576
</dependency>
7677
</dependencies>
7778
<build>

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,7 @@ private class MQTTUtilsPythonHelper {
8787
brokerUrl: String,
8888
topic: String,
8989
storageLevel: StorageLevel
90-
): JavaDStream[Array[Byte]] = {
91-
val dstream = MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
92-
dstream.map(new Function[String, Array[Byte]] {
93-
override def call(data: String): Array[Byte] = {
94-
data.getBytes("UTF-8")
95-
}
96-
})
90+
): JavaDStream[String] = {
91+
MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
9792
}
9893
}

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala renamed to external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.streaming.mqtt
1919

2020
import java.net.{ServerSocket, URI}
21-
import java.util.concurrent.{TimeUnit, CountDownLatch}
21+
import java.util.concurrent.{CountDownLatch, TimeUnit}
2222

2323
import scala.language.postfixOps
2424

@@ -27,7 +27,7 @@ import org.apache.commons.lang3.RandomUtils
2727
import org.eclipse.paho.client.mqttv3._
2828
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
2929

30-
import org.apache.spark.streaming.{StreamingContext, Milliseconds}
30+
import org.apache.spark.streaming.StreamingContext
3131
import org.apache.spark.streaming.scheduler.StreamingListener
3232
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
3333
import org.apache.spark.util.Utils
@@ -40,7 +40,7 @@ private class MQTTTestUtils extends Logging {
4040

4141
private val persistenceDir = Utils.createTempDir()
4242
private val brokerHost = "localhost"
43-
private var brokerPort = findFreePort()
43+
private val brokerPort = findFreePort()
4444

4545
private var broker: BrokerService = _
4646
private var connector: TransportConnector = _

python/pyspark/streaming/tests.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -850,28 +850,43 @@ def tearDown(self):
850850
def _randomTopic(self):
851851
return "topic-%d" % random.randint(0, 10000)
852852

853-
def _validateStreamResult(self, sendData, dstream):
853+
def _startContext(self, topic):
854+
# Start the StreamingContext and also collect the result
855+
stream = MQTTUtils.createStream(self.ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
854856
result = []
855857

856-
def get_output(_, rdd):
858+
def getOutput(_, rdd):
857859
for data in rdd.collect():
858860
result.append(data)
859861

860-
dstream.foreachRDD(get_output)
861-
receiveData = ' '.join(result[0])
862+
stream.foreachRDD(getOutput)
863+
self.ssc.start()
864+
return result
865+
866+
def _publishData(self, topic, data):
867+
start_time = time.time()
868+
while True:
869+
try:
870+
self._MQTTTestUtils.publishData(topic, data)
871+
break
872+
except:
873+
if time.time() - start_time < self.timeout:
874+
time.sleep(0.01)
875+
else:
876+
raise
877+
878+
def _validateStreamResult(self, sendData, result):
879+
receiveData = ''.join(result[0])
862880
self.assertEqual(sendData, receiveData)
863881

864882
def test_mqtt_stream(self):
865883
"""Test the Python MQTT stream API."""
866-
topic = self._randomTopic()
867884
sendData = "MQTT demo for spark streaming"
868-
ssc = self.ssc
869-
870-
self._MQTTTestUtils.waitForReceiverToStart(ssc)
871-
self._MQTTTestUtils.publishData(topic, sendData)
872-
873-
stream = MQTTUtils.createStream(ssc, "tcp://" + self._MQTTTestUtils.brokerUri(), topic)
874-
self._validateStreamResult(sendData, stream)
885+
topic = self._randomTopic()
886+
result = self._startContext(topic)
887+
self._publishData(topic, sendData)
888+
self.wait_for(result, len(sendData))
889+
self._validateStreamResult(sendData, result)
875890

876891

877892
def search_kafka_assembly_jar():
@@ -928,11 +943,29 @@ def search_mqtt_assembly_jar():
928943
return jars[0]
929944

930945

946+
def search_mqtt_test_jar():
947+
SPARK_HOME = os.environ["SPARK_HOME"]
948+
mqtt_test_dir = os.path.join(SPARK_HOME, "external/mqtt")
949+
jars = glob.glob(
950+
os.path.join(mqtt_test_dir, "target/scala-*/spark-streaming-mqtt-test-*.jar"))
951+
if not jars:
952+
raise Exception(
953+
("Failed to find Spark Streaming MQTT test jar in %s. " % mqtt_test_dir) +
954+
"You need to build Spark with "
955+
"'build/sbt assembly/assembly streaming-mqtt/test:assembly'")
956+
elif len(jars) > 1:
957+
raise Exception(("Found multiple Spark Streaming MQTT test JARs in %s; please "
958+
"remove all but one") % mqtt_test_dir)
959+
else:
960+
return jars[0]
961+
931962
if __name__ == "__main__":
932963
kafka_assembly_jar = search_kafka_assembly_jar()
933964
flume_assembly_jar = search_flume_assembly_jar()
934965
mqtt_assembly_jar = search_mqtt_assembly_jar()
935-
jars = "%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar)
966+
mqtt_test_jar = search_mqtt_test_jar()
967+
jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar,
968+
mqtt_assembly_jar, mqtt_test_jar)
936969

937970
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
938971
unittest.main()

0 commit comments

Comments
 (0)