Skip to content

Commit 09aeee1

Browse files
committed
Do not restart receiver when stopped
1 parent 852f4de commit 09aeee1

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20+
import scala.util.control.NonFatal
21+
2022
import org.apache.spark.streaming.StreamingContext
2123
import org.apache.spark.storage.StorageLevel
2224
import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
7476
while(!isStopped && iterator.hasNext) {
7577
store(iterator.next)
7678
}
79+
if (!isStopped()) {
80+
restart("Socket data stream had no more data")
81+
}
7782
logInfo("Stopped receiving")
78-
restart("Retrying connecting to " + host + ":" + port)
7983
} catch {
8084
case e: java.net.ConnectException =>
8185
restart("Error connecting to " + host + ":" + port, e)
82-
case t: Throwable =>
83-
restart("Error receiving data", t)
86+
case NonFatal(e) =>
87+
logWarning("Error receiving data", e)
88+
restart("Error receiving data", e)
8489
} finally {
8590
if (socket != null) {
8691
socket.close()

0 commit comments

Comments
 (0)