Skip to content

Commit b34c3c1

Browse files
committed
adress comments
1 parent 3aa7fff commit b34c3c1

File tree

2 files changed

+5
-8
lines changed

2 files changed

+5
-8
lines changed

examples/src/main/python/streaming/mqtt_wordcount.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@
4444
sc = SparkContext(appName="PythonStreamingMQTTWordCount")
4545
ssc = StreamingContext(sc, 1)
4646

47-
broker_url = sys.argv[1]
47+
brokerUrl = sys.argv[1]
4848
topic = sys.argv[2]
4949

50-
data = MQTTUtils.createStream(ssc, topic, broker_url)
51-
lines = data.map(lambda x: x[1])
50+
lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
5251
counts = lines.flatMap(lambda line: line.split(" ")) \
5352
.map(lambda word: (word, 1)) \
5453
.reduceByKey(lambda a, b: a+b)

python/pyspark/streaming/mqtt.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from py4j.java_gateway import java_import, Py4JError
2020

2121
from pyspark.storagelevel import StorageLevel
22-
from pyspark.serializers import PairDeserializer, NoOpSerializer
22+
from pyspark.serializers import UTF8Deserializer
2323
from pyspark.streaming import DStream
2424

2525
__all__ = ['MQTTUtils']
@@ -28,7 +28,7 @@
2828
class MQTTUtils(object):
2929

3030
@staticmethod
31-
def createStream(ssc, brokerUrl, topic
31+
def createStream(ssc, brokerUrl, topic,
3232
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
3333
"""
3434
Create an input stream that pulls messages from a Mqtt Broker.
@@ -52,6 +52,4 @@ def createStream(ssc, brokerUrl, topic
5252
print " $ bin/spark-submit --driver-class-path external/mqtt-assembly/target/" + \
5353
"scala-*/spark-streaming-mqtt-assembly-*.jar"
5454
raise e
55-
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
56-
stream = DStream(jstream, ssc, ser)
57-
return stream
55+
return DStream(jstream, ssc, UTF8Deserializer())

0 commit comments

Comments
 (0)