File tree Expand file tree Collapse file tree 1 file changed +5
-2
lines changed
src/main/scala/org/apache/spark/sql/pulsar Expand file tree Collapse file tree 1 file changed +5
-2
lines changed Original file line number Diff line number Diff line change @@ -70,7 +70,7 @@ private[pulsar] abstract class PulsarSourceRDDBase(
7070 .loadConf(readerConf)
7171 .create()
7272
73- new NextIterator [InternalRow ] {
73+ val iter = new NextIterator [InternalRow ] {
7474
7575 private var inEnd : Boolean = false
7676 private var isLast : Boolean = false
@@ -150,7 +150,6 @@ private[pulsar] abstract class PulsarSourceRDDBase(
150150 } catch {
151151 case e : PulsarClientException =>
152152 logError(s " PulsarClient failed to read message from topic $topic" , e)
153- close()
154153 throw e
155154 case e : Throwable =>
156155 throw e
@@ -161,6 +160,10 @@ private[pulsar] abstract class PulsarSourceRDDBase(
161160 reader.close()
162161 }
163162 }
163+ context.addTaskCompletionListener[Unit ] { _ =>
164+ iter.closeIfNeeded()
165+ }
166+ iter
164167 }
165168}
166169
You can’t perform that action at this time.
0 commit comments