Skip to content

Commit 3dad595

Browse files
author
liguoqiang
committed
review comment
1 parent e3e56aa commit 3dad595

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -851,8 +851,10 @@ class SparkContext(
851851
partitions: Seq[Int],
852852
allowLocal: Boolean,
853853
resultHandler: (Int, U) => Unit) {
854-
val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet)
855-
require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(","))
854+
// TODO: All RDDs have continuous index space. How to ensure this?
855+
partitions.foreach{ p =>
856+
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
857+
}
856858
val callSite = getCallSite
857859
val cleanedFunc = clean(func)
858860
logInfo("Starting job: " + callSite)
@@ -956,8 +958,9 @@ class SparkContext(
956958
resultHandler: (Int, U) => Unit,
957959
resultFunc: => R): SimpleFutureAction[R] =
958960
{
959-
val outIndex = partitions.toSet.diff(rdd.partitions.map(_.index).toSet)
960-
require(outIndex.isEmpty,"Partition index out of bounds: " + outIndex.mkString(","))
961+
partitions.foreach{ p =>
962+
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
963+
}
961964
val cleanF = clean(processPartition)
962965
val callSite = getCallSite
963966
val waiter = dagScheduler.submitJob(

0 commit comments

Comments
 (0)