Skip to content

Commit 795ec27

Browse files
committed
address comments
1 parent ee387ae commit 795ec27

File tree

3 files changed

+25
-9
lines changed

3 files changed

+25
-9
lines changed

examples/src/main/python/streaming/mqtt_wordcount.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
`$ bin/run-example \
2626
org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
2727
and then run the example as
28-
`$ bin/spark-submit --driver-class-path external/mqtt-assembly/target/scala-*/\
28+
`$ bin/spark-submit --jars external/mqtt-assembly/target/scala-*/\
2929
spark-streaming-mqtt-assembly-*.jar examples/src/main/python/streaming/mqtt_wordcount.py \
3030
tcp://localhost:1883 foo`
3131
"""

external/mqtt-assembly/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent_2.10</artifactId>
24-
<version>1.4.0-SNAPSHOT</version>
24+
<version>1.5.0-SNAPSHOT</version>
2525
<relativePath>../../pom.xml</relativePath>
2626
</parent>
2727

2828
<groupId>org.apache.spark</groupId>
2929
<artifactId>spark-streaming-mqtt-assembly_2.10</artifactId>
3030
<packaging>jar</packaging>
31-
<name>Spark Project External Kafka Assembly</name>
31+
<name>Spark Project External MQTT Assembly</name>
3232
<url>http://spark.apache.org/</url>
3333

3434
<properties>

python/pyspark/streaming/mqtt.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,28 @@ def createStream(ssc, brokerUrl, topic,
4444

4545
try:
4646
jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel)
47-
4847
except Py4JError, e:
49-
# TODO: use --jar once it also work on driver
50-
if not e.message or 'call a package' in e.message:
51-
print "No Mqtt package, please put the assembly jar into classpath:"
52-
print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
53-
"scala-*/spark-streaming-mqtt-assembly-*.jar"
48+
if 'ClassNotFoundException' in str(e.java_exception):
49+
MQTTUtils._printErrorMsg(ssc.sparkContext)
5450
raise e
5551
return DStream(jstream, ssc, UTF8Deserializer())
52+
53+
@staticmethod
54+
def _printErrorMsg(sc):
55+
print("""
56+
________________________________________________________________________________________________
57+
58+
Spark Streaming's MQTT libraries not found in class path. Try one of the following.
59+
60+
1. Include the MQTT library and its dependencies with in the
61+
spark-submit command as
62+
63+
$ bin/spark-submit --packages org.apache.spark:spark-streaming-mqtt:%s ...
64+
65+
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
66+
Group Id = org.apache.spark, Artifact Id = spark-streaming-mqtt-assembly, Version = %s.
67+
Then, include the jar in the spark-submit command as
68+
69+
$ bin/spark-submit --jars <spark-streaming-mqtt-assembly.jar> ...
70+
________________________________________________________________________________________________
71+
""" % (sc.version, sc.version))

0 commit comments

Comments
 (0)