Skip to content

Commit

Permalink
Merge pull request #146 from halfabrane/SPLIT-LARGE-SHAPEFILES
Browse files Browse the repository at this point in the history
Use Shx Index to split large shapefiles
  • Loading branch information
harsha2010 authored Aug 14, 2017
2 parents d13db84 + c8f97a7 commit e7f91ce
Show file tree
Hide file tree
Showing 15 changed files with 2,009 additions and 36 deletions.
24 changes: 23 additions & 1 deletion src/main/scala/magellan/ShapefileRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util.Objects

import magellan.io._
import magellan.mapreduce._
import org.apache.hadoop.io.{MapWritable, Text}
import org.apache.hadoop.io.{ArrayWritable, LongWritable, MapWritable, Text}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

import scala.collection.JavaConversions._
import scala.util.Try

/**
* A Shapefile relation is the entry point for working with Shapefile formats.
Expand All @@ -37,6 +38,27 @@ case class ShapeFileRelation(

protected override def _buildScan(): RDD[Array[Any]] = {

// read the shx files, if they exist
val fileNameToFileSplits = Try(sc.newAPIHadoopFile(
path + "/*.shx",
classOf[ShxInputFormat],
classOf[Text],
classOf[ArrayWritable]
).map { case (txt: Text, splits: ArrayWritable) =>
val fileName = txt.toString
val s = splits.get()
val size = s.length
var i = 0
val v = Array.fill(size)(0L)
while (i < size) {
v.update(i, s(i).asInstanceOf[LongWritable].get())
i += 1
}
(fileName, v)
}.collectAsMap())

fileNameToFileSplits.map(SplitInfos.SPLIT_INFO_MAP.set(_))

val shapefileRdd = sqlContext.sparkContext.newAPIHadoopFile(
path + "/*.shp",
classOf[ShapeInputFormat],
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/magellan/io/ShapeWritable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import magellan.Shape
import org.apache.commons.io.EndianUtils
import org.apache.hadoop.io.Writable

private[magellan] class ShapeWritable(shapeType: Int) extends Writable {
private[magellan] class ShapeWritable extends Writable {

var shape: Shape = _

Expand All @@ -32,8 +32,6 @@ private[magellan] class ShapeWritable(shapeType: Int) extends Writable {

override def readFields(dataInput: DataInput): Unit = {
val shapeType = EndianUtils.swapInteger(dataInput.readInt())
// all records share the same type or nullshape.
require(this.shapeType == shapeType || shapeType == 0)
val h = shapeType match {
case 0 => new NullShapeReader()
case 1 => new PointReader()
Expand Down
66 changes: 61 additions & 5 deletions src/main/scala/magellan/mapreduce/ShapeInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,76 @@

package magellan.mapreduce

import org.apache.hadoop.fs.Path
import com.google.common.base.Stopwatch
import magellan.io.{ShapeKey, ShapeWritable}
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hadoop.mapreduce.lib.input._
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, TaskAttemptContext}

import magellan.io.{ShapeWritable, ShapeKey}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

private[magellan] class ShapeInputFormat extends FileInputFormat[ShapeKey, ShapeWritable] {
private[magellan] class ShapeInputFormat
extends FileInputFormat[ShapeKey, ShapeWritable] {

private val log = LogFactory.getLog(classOf[ShapeInputFormat])

override def createRecordReader(inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext) = {
new ShapefileReader
}

// TODO: Use DBIndex to figure out how to efficiently split files.
override def isSplitable(context: JobContext, filename: Path): Boolean = false
override def isSplitable(context: JobContext, filename: Path): Boolean = true

override def getSplits(job: JobContext): java.util.List[InputSplit] = {
val splitInfos = SplitInfos.SPLIT_INFO_MAP.get()
computeSplits(job, splitInfos)
}

private def computeSplits(
job: JobContext,
splitInfos: scala.collection.Map[String, Array[Long]]) = {

val sw = new Stopwatch().start
val splits = ListBuffer[InputSplit]()
val files = listStatus(job)
for (file <- files) {
val path = file.getPath
val length = file.getLen
val blkLocations = if (file.isInstanceOf[LocatedFileStatus]) {
file.asInstanceOf[LocatedFileStatus].getBlockLocations
} else {
val fs = path.getFileSystem(job.getConfiguration)
fs.getFileBlockLocations(file, 0, length)
}
val key = path.getName.split("\\.shp$")(0)
if (splitInfos == null || !splitInfos.containsKey(key)) {
val blkIndex = getBlockIndex(blkLocations, 0)
splits.+= (makeSplit(path, 0, length, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
} else {
val s = splitInfos(key).toSeq
val start = s
val end = s.drop(1) ++ Seq(length)
start.zip(end).foreach { case (startOffset: Long, endOffset: Long) =>
val blkIndex = getBlockIndex(blkLocations, startOffset)
splits.+=(makeSplit(path, startOffset, endOffset - startOffset, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
}
}
}
sw.stop
if (log.isDebugEnabled) {
log.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis)
}
splits
}
}

object SplitInfos {

// TODO: Can we get rid of this hack to pass split calculation to the Shapefile Reader?
val SPLIT_INFO_MAP = new ThreadLocal[scala.collection.Map[String, Array[Long]]]

}
43 changes: 20 additions & 23 deletions src/main/scala/magellan/mapreduce/ShapefileReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit

private var dis: DataInputStream = _

private var length: BigInt = _

private var remaining: BigInt = _

override def getProgress: Float = remaining.toFloat / length.toFloat
override def getProgress: Float = 0

override def nextKeyValue(): Boolean = {
if (remaining <= 0) {
Expand All @@ -47,7 +45,7 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit
val recordNumber = dis.readInt()
// record numbers begin at 1
require(recordNumber > 0)
val contentLength = 16 * (dis.readInt() + 4)
val contentLength = 2 * (dis.readInt() + 4)
value.readFields(dis)
remaining -= contentLength
key.setRecordIndex(key.getRecordIndex() + 1)
Expand All @@ -60,27 +58,26 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit
override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext) {
val split = inputSplit.asInstanceOf[FileSplit]
val job = MapReduceUtils.getConfigurationFromContext(taskAttemptContext)
val start = split.getStart()
val end = start + split.getLength()
val file = split.getPath()
val fs = file.getFileSystem(job)
val is = fs.open(split.getPath())

val path = split.getPath()
val fs = path.getFileSystem(job)
val is = fs.open(path)

val (start, end) = {
val v = split.getStart
if (v == 0) {
is.seek(24)
(100L, 2 * is.readInt().toLong)
} else {
(v, v + split.getLength)
}
}

is.seek(start)
dis = new DataInputStream(is)
require(is.readInt() == 9994)
// skip the next 20 bytes which should all be zero
0 until 5 foreach {_ => require(is.readInt() == 0)}
// file length in bits
val i: BigInt = is.readInt()
length = 16 * i - 50 * 16
remaining = length
val version = EndianUtils.swapInteger(is.readInt())
require(version == 1000)
// shape type: all the shapes in a given split have the same type
val shapeType = EndianUtils.swapInteger(is.readInt())
key.setFileNamePrefix(split.getPath.getName.split("\\.")(0))
value = new ShapeWritable(shapeType)
// skip the next 64 bytes
0 until 8 foreach {_ => is.readDouble()}
value = new ShapeWritable()
remaining = (end - start)
}

override def getCurrentKey: ShapeKey = key
Expand Down
130 changes: 130 additions & 0 deletions src/main/scala/magellan/mapreduce/ShxInputFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2015 Ram Sriharsha
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magellan.mapreduce

import java.io.DataInputStream

import org.apache.commons.io.EndianUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}

import scala.collection.mutable.ListBuffer

class ShxInputFormat extends FileInputFormat[Text, ArrayWritable] {

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[Text, ArrayWritable] = {
new ShxReader()
}

override def isSplitable(context: JobContext, filename: Path): Boolean = false
}

class ShxReader extends RecordReader[Text, ArrayWritable] {

private var dis: DataInputStream = _

override def getProgress: Float = ???

private var done: Boolean = false

private var splits:ArrayWritable = _

private var key: Text = new Text()

private val MAX_SPLIT_SIZE = "mapreduce.input.fileinputformat.split.maxsize"

private val MIN_SPLIT_SIZE = "mapreduce.input.fileinputformat.split.minsize"


override def nextKeyValue(): Boolean = if (done) false else {
done = true
true
}

override def getCurrentValue: ArrayWritable = {
splits
}

override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
val split = inputSplit.asInstanceOf[FileSplit]
val job = MapReduceUtils.getConfigurationFromContext(context)
val start = split.getStart()
val end = start + split.getLength()
val path = split.getPath()
val fs = path.getFileSystem(job)
key.set(split.getPath.getName.split("\\.")(0))
val is = fs.open(path)
dis = new DataInputStream(is)
require(is.readInt() == 9994)
// skip the next 20 bytes which should all be zero
0 until 5 foreach {_ => require(is.readInt() == 0)}
// file length in bits
val len = is.readInt()
val numRecords = (2 * len - 100) / 8

val version = EndianUtils.swapInteger(is.readInt())
require(version == 1000)
// shape type: all the shapes in a given split have the same type
is.readInt()

// skip the next 64 bytes
0 until 8 foreach {_ => is.readDouble()}

// iterate over the offset and content length of each record
var j = 0
val minSplitSize = job.getLong(MIN_SPLIT_SIZE, 1L)
val maxSplitSize = job.getLong(MAX_SPLIT_SIZE, Long.MaxValue)
val shpFileName = path.getName.replace("\\.shx$", "\\.shp")
val blockSize = fs.getFileStatus(new Path(path.getParent, shpFileName)).getBlockSize
val splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize))

// num bytes
val v = new ListBuffer[Writable]()

var startOffset: Long = Long.MinValue

while (j < numRecords) {
val offset = dis.readInt()
// skip the next 4 bytes (the content length)
dis.readInt()

if (startOffset == Long.MinValue) {
startOffset = offset
}
else if (offset - startOffset > splitSize) {
v.+= (new LongWritable(startOffset * 2))
startOffset = offset
}
j += 1
}

// if empty add starting offset
if (v.isEmpty) {
v.+= (new LongWritable(startOffset * 2))
}

splits = new ArrayWritable(classOf[LongWritable], v.toArray)
}

override def getCurrentKey: Text = key

override def close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UTF-8
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
GEOGCS["GCS_North_American_1983",DATUM["D_North_American_1983",SPHEROID["GRS_1980",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]]
Binary file not shown.
Loading

0 comments on commit e7f91ce

Please sign in to comment.