Skip to content

Commit 5cce16f

Browse files
Make sure getBlockLocations uses offset and length to find the blocks on HDFS
1 parent eadde56 commit 5cce16f

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ class HDFSBackedBlockRDD[T: ClassTag](
8686
val partition = split.asInstanceOf[HDFSBackedBlockRDDPartition]
8787
val locations = getBlockIdLocations()
8888
locations.getOrElse(partition.blockId,
89-
HdfsUtils.getBlockLocations(partition.segment.path, hadoopConfiguration)
89+
HdfsUtils.getBlockLocations(partition.segment.path, partition.segment.offset,
90+
partition.segment.length, hadoopConfiguration)
9091
.getOrElse(new Array[String](0)).toSeq)
9192
}
9293
}

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ private[streaming] object HdfsUtils {
5252
}
5353
}
5454

55-
def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
55+
def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration):
56+
Option[Array[String]] = {
5657
val dfsPath = new Path(path)
5758
val dfs = getFileSystemForPath(dfsPath, conf)
5859
val fileStatus = dfs.getFileStatus(dfsPath)
59-
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
60+
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length))
6061
blockLocs.map(_.flatMap(_.getHosts))
6162
}
6263

0 commit comments

Comments
 (0)