@@ -68,7 +68,7 @@ private[spark] class PythonRDD(
6868 // Start a thread to feed the process input from our parent's iterator
6969 val writerThread = new WriterThread (env, worker, split, context)
7070
71- context.addOnCompleteCallback { () =>
71+ context.addTaskCompletionListener { context =>
7272 writerThread.shutdownOnTaskCompletion()
7373
7474 // Cleanup the worker socket. This will also cause the Python worker to exit.
@@ -137,7 +137,7 @@ private[spark] class PythonRDD(
137137 }
138138 } catch {
139139
140- case e : Exception if context.interrupted =>
140+ case e : Exception if context.isInterrupted =>
141141 logDebug(" Exception thrown after task interruption" , e)
142142 throw new TaskKilledException
143143
@@ -176,7 +176,7 @@ private[spark] class PythonRDD(
176176
177177 /** Terminates the writer thread, ignoring any exceptions that may occur due to cleanup. */
178178 def shutdownOnTaskCompletion () {
179- assert(context.completed )
179+ assert(context.isCompleted )
180180 this .interrupt()
181181 }
182182
@@ -209,7 +209,7 @@ private[spark] class PythonRDD(
209209 PythonRDD .writeIteratorToStream(parent.iterator(split, context), dataOut)
210210 dataOut.flush()
211211 } catch {
212- case e : Exception if context.completed || context.interrupted =>
212+ case e : Exception if context.isCompleted || context.isInterrupted =>
213213 logDebug(" Exception thrown after task completion (likely due to cleanup)" , e)
214214
215215 case e : Exception =>
@@ -235,10 +235,10 @@ private[spark] class PythonRDD(
235235 override def run () {
236236 // Kill the worker if it is interrupted, checking until task completion.
237237 // TODO: This has a race condition if interruption occurs, as completed may still become true.
238- while (! context.interrupted && ! context.completed ) {
238+ while (! context.isInterrupted && ! context.isCompleted ) {
239239 Thread .sleep(2000 )
240240 }
241- if (! context.completed ) {
241+ if (! context.isCompleted ) {
242242 try {
243243 logWarning(" Incomplete task interrupted: Attempting to kill Python Worker" )
244244 env.destroyPythonWorker(pythonExec, envVars.toMap, worker)
@@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
372372 batchSize : Int ) = {
373373 val keyClass = Option (keyClassMaybeNull).getOrElse(" org.apache.hadoop.io.Text" )
374374 val valueClass = Option (valueClassMaybeNull).getOrElse(" org.apache.hadoop.io.Text" )
375- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
376- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
375+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
376+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
377377 val rdd = sc.sc.sequenceFile[K , V ](path, kc, vc, minSplits)
378378 val confBroadcasted = sc.sc.broadcast(new SerializableWritable (sc.hadoopConfiguration()))
379379 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
440440 keyClass : String ,
441441 valueClass : String ,
442442 conf : Configuration ) = {
443- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
444- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
445- val fc = Class .forName (inputFormatClass).asInstanceOf [Class [F ]]
443+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
444+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
445+ val fc = Utils .classForName (inputFormatClass).asInstanceOf [Class [F ]]
446446 if (path.isDefined) {
447447 sc.sc.newAPIHadoopFile[K , V , F ](path.get, fc, kc, vc, conf)
448448 } else {
@@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
509509 keyClass : String ,
510510 valueClass : String ,
511511 conf : Configuration ) = {
512- val kc = Class .forName (keyClass).asInstanceOf [Class [K ]]
513- val vc = Class .forName (valueClass).asInstanceOf [Class [V ]]
514- val fc = Class .forName (inputFormatClass).asInstanceOf [Class [F ]]
512+ val kc = Utils .classForName (keyClass).asInstanceOf [Class [K ]]
513+ val vc = Utils .classForName (valueClass).asInstanceOf [Class [V ]]
514+ val fc = Utils .classForName (inputFormatClass).asInstanceOf [Class [F ]]
515515 if (path.isDefined) {
516516 sc.sc.hadoopFile(path.get, fc, kc, vc)
517517 } else {
@@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
558558 for {
559559 k <- Option (keyClass)
560560 v <- Option (valueClass)
561- } yield (Class .forName (k), Class .forName (v))
561+ } yield (Utils .classForName (k), Utils .classForName (v))
562562 }
563563
564564 private def getKeyValueConverters (keyConverterClass : String , valueConverterClass : String ,
@@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
621621 val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
622622 inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
623623 val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
624- val codec = Option (compressionCodecClass).map(Class .forName (_).asInstanceOf [Class [C ]])
624+ val codec = Option (compressionCodecClass).map(Utils .classForName (_).asInstanceOf [Class [C ]])
625625 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
626626 new JavaToWritableConverter )
627- val fc = Class .forName (outputFormatClass).asInstanceOf [Class [F ]]
627+ val fc = Utils .classForName (outputFormatClass).asInstanceOf [Class [F ]]
628628 converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf (mergedConf), codec= codec)
629629 }
630630
@@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
653653 val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
654654 val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
655655 new JavaToWritableConverter )
656- val fc = Class .forName (outputFormatClass).asInstanceOf [Class [F ]]
656+ val fc = Utils .classForName (outputFormatClass).asInstanceOf [Class [F ]]
657657 converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
658658 }
659659
0 commit comments