Skip to content

Commit c4a188c

Browse files
committed
Don't remove FileStatus and BlockaLocation for Hadoop 2.7
1 parent 47326ac commit c4a188c

File tree

1 file changed

+60
-1
lines changed

1 file changed

+60
-1
lines changed

core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,53 @@ private[spark] object HadoopFSUtils extends Logging {
136136
parallelismMax = 0)
137137
(path, leafFiles)
138138
}.iterator
139+
}.map { case (path, statuses) =>
140+
val serializableStatuses = statuses.map { status =>
141+
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
142+
val blockLocations = status match {
143+
case f: LocatedFileStatus =>
144+
f.getBlockLocations.map { loc =>
145+
SerializableBlockLocation(
146+
loc.getNames,
147+
loc.getHosts,
148+
loc.getOffset,
149+
loc.getLength)
150+
}
151+
152+
case _ =>
153+
Array.empty[SerializableBlockLocation]
154+
}
155+
156+
SerializableFileStatus(
157+
status.getPath.toString,
158+
status.getLen,
159+
status.isDirectory,
160+
status.getReplication,
161+
status.getBlockSize,
162+
status.getModificationTime,
163+
status.getAccessTime,
164+
blockLocations)
165+
}
166+
(path.toString, serializableStatuses)
139167
}.collect()
140168
} finally {
141169
sc.setJobDescription(previousJobDescription)
142170
}
143171

144-
statusMap.toSeq
172+
// turn SerializableFileStatus back to Status
173+
statusMap.map { case (path, serializableStatuses) =>
174+
val statuses = serializableStatuses.map { f =>
175+
val blockLocations = f.blockLocations.map { loc =>
176+
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
177+
}
178+
new LocatedFileStatus(
179+
new FileStatus(
180+
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
181+
new Path(f.path)),
182+
blockLocations)
183+
}
184+
(new Path(path), statuses)
185+
}
145186
}
146187

147188
// scalastyle:off argcount
@@ -291,4 +332,22 @@ private[spark] object HadoopFSUtils extends Logging {
291332
resolvedLeafStatuses
292333
}
293334
// scalastyle:on argcount
335+
336+
/** A serializable variant of HDFS's BlockLocation. */
337+
private case class SerializableBlockLocation(
338+
names: Array[String],
339+
hosts: Array[String],
340+
offset: Long,
341+
length: Long)
342+
343+
/** A serializable variant of HDFS's FileStatus. */
344+
private case class SerializableFileStatus(
345+
path: String,
346+
length: Long,
347+
isDir: Boolean,
348+
blockReplication: Short,
349+
blockSize: Long,
350+
modificationTime: Long,
351+
accessTime: Long,
352+
blockLocations: Array[SerializableBlockLocation])
294353
}

0 commit comments

Comments
 (0)