Skip to content

Commit e1ee016

Browse files
committed
scala style fix
1 parent a5a8f9f commit e1ee016

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private class MQTTTestUtils extends Logging {
111111
/**
112112
* Block until at least one receiver has started or timeout occurs.
113113
*/
114-
def waitForReceiverToStart(ssc: StreamingContext) = {
114+
def waitForReceiverToStart(ssc: StreamingContext) : Unit = {
115115
val latch = new CountDownLatch(1)
116116
ssc.addStreamingListener(new StreamingListener {
117117
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {

external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterA
5454
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
5555
ssc = new StreamingContext(sparkConf, Milliseconds(500))
5656
val sendMessage = "MQTT demo for spark streaming"
57-
val receiveStream =
58-
MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic, StorageLevel.MEMORY_ONLY)
57+
58+
val receiveStream = MQTTUtils.createStream(ssc, "tcp://" + MQTTTestUtils.brokerUri, topic,
59+
StorageLevel.MEMORY_ONLY)
60+
5961
@volatile var receiveMessage: List[String] = List()
6062
receiveStream.foreachRDD { rdd =>
6163
if (rdd.collect.length > 0) {

0 commit comments

Comments
 (0)