Skip to content

Commit 9767d82

Browse files
committed
implemented Python-friendly class
1 parent a11968b commit 9767d82

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt
1919

2020
import scala.reflect.ClassTag
2121

22+
import org.apache.spark.api.java.function.Function
2223
import org.apache.spark.storage.StorageLevel
2324
import org.apache.spark.streaming.StreamingContext
2425
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext, JavaDStream}
@@ -74,3 +75,24 @@ object MQTTUtils {
7475
createStream(jssc.ssc, brokerUrl, topic, storageLevel)
7576
}
7677
}
78+
79+
/**
80+
* This is a helper class that wraps the methods in MQTTUtils into more Python-friendly class and
81+
* function so that it can be easily instantiated and called from Python's MQTTUtils.
82+
*/
83+
private class MQTTUtilsPythonHelper {
84+
85+
def createStream(
86+
jssc: JavaStreamingContext,
87+
brokerUrl: String,
88+
topic: String,
89+
storageLevel: StorageLevel
90+
): JavaDStream[Array[Byte]] = {
91+
val dstream = MQTTUtils.createStream(jssc, brokerUrl, topic, storageLevel)
92+
dstream.map(new Function[String, Array[Byte]] {
93+
override def call(data: String): Array[Byte] = {
94+
data.getBytes("UTF-8")
95+
}
96+
})
97+
}
98+
}

python/pyspark/streaming/mqtt.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18-
from py4j.java_collections import MapConverter
19-
from py4j.java_gateway import java_import, Py4JJavaError
18+
from py4j.java_gateway import Py4JJavaError
2019

2120
from pyspark.storagelevel import StorageLevel
2221
from pyspark.serializers import UTF8Deserializer
@@ -38,12 +37,13 @@ def createStream(ssc, brokerUrl, topic,
3837
:param storageLevel: RDD storage level.
3938
:return: A DStream object
4039
"""
41-
java_import(ssc._jvm, "org.apache.spark.streaming.mqtt.MQTTUtils")
42-
4340
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
4441

4542
try:
46-
jstream = ssc._jvm.MQTTUtils.createStream(ssc._jssc, brokerUrl, topic, jlevel)
43+
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
44+
.loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
45+
helper = helperClass.newInstance()
46+
jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
4747
except Py4JJavaError as e:
4848
if 'ClassNotFoundException' in str(e.java_exception):
4949
MQTTUtils._printErrorMsg(ssc.sparkContext)

0 commit comments

Comments
 (0)