Skip to content

Commit

Permalink
Add binary option to export rectangles (hail-is#5046)
Browse files Browse the repository at this point in the history
* added writeBytes to exportRectangles

* complete docs, fix newline bug in tsv and improve test

* add ndmin = 2 to np.loadtxt

* wip

* using
  • Loading branch information
jbloom22 authored and danking committed Jan 2, 2019
1 parent 54567d4 commit bf5fc7d
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 95 deletions.
30 changes: 21 additions & 9 deletions hail/python/hail/linalg/blockmatrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -1828,9 +1828,10 @@ def sparsify_rectangles(self, rectangles):
path_out=str,
rectangles=sequenceof(sequenceof(int)),
delimiter=str,
n_partitions=nullable(int))
def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partitions=None):
"""Export rectangular regions from a stored block matrix to delimited text files.
n_partitions=nullable(int),
binary=bool)
def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partitions=None, binary=False):
"""Export rectangular regions from a stored block matrix to delimited text or binary files.
Examples
--------
Expand Down Expand Up @@ -1889,11 +1890,7 @@ def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partition
Notes
-----
This method exports rectangular regions of a stored block matrix
to delimited text files, in parallel by region.
The block matrix can be sparse so long as all blocks overlapping
the rectangles are present, i.e. this method does not currently
support implicit zeros.
to delimited text or binary files, in parallel by region.
Each rectangle is encoded as a list of length four of
the form ``[row_start, row_stop, col_start, col_stop]``,
Expand All @@ -1908,6 +1905,19 @@ def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partition
Each file name encodes the index of the rectangle in `rectangles`
and the bounds as formatted in the example.
The block matrix can be sparse provided all blocks overlapping
the rectangles are present, i.e. this method does not currently
support implicit zeros.
If `binary` is true, each element is exported as 8 bytes, in row
major order with no delimiting, new lines, or shape information. Such
files can instantiate, for example, NumPy ndarrays using
`fromfile <https://docs.scipy.org/doc/numpy/reference/generated/numpy.fromfile.html>`__
and
`reshape <https://docs.scipy.org/doc/numpy/reference/generated/numpy.reshape.html>`__.
Note however that these binary files are not platform independent; in
particular, no byte-order or data-type information is saved.
The number of rectangles must be less than :math:`2^{29}`.
Parameters
Expand All @@ -1924,6 +1934,8 @@ def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partition
n_partitions: :obj:`int`, optional
Maximum parallelism of export.
Defaults to (and cannot exceed) the number of rectangles.
binary: :obj:`bool`
If true, export elements as raw bytes in row major order.
"""
n_rectangles = len(rectangles)
if n_rectangles == 0:
Expand Down Expand Up @@ -1954,7 +1966,7 @@ def export_rectangles(path_in, path_out, rectangles, delimiter='\t', n_partition
flattened_rectangles = jarray(Env.jvm().long, list(itertools.chain(*rectangles)))

return Env.hail().linalg.BlockMatrix.exportRectangles(
Env.hc()._jhc, path_in, path_out, flattened_rectangles, delimiter, n_partitions)
Env.hc()._jhc, path_in, path_out, flattened_rectangles, delimiter, n_partitions, binary)

@typecheck_method(compute_uv=bool,
complexity_bound=int)
Expand Down
16 changes: 14 additions & 2 deletions hail/python/test/hail/linalg/test_linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ def test_sparsify_rectangles(self):
[13., 14., 0., 0.]]))

self._assert_eq(bm.sparsify_rectangles([]), np.zeros(shape=(4, 4)))

def test_export_rectangles(self):
nd = np.arange(0, 80, dtype=float).reshape(8, 10)

Expand All @@ -637,6 +637,7 @@ def test_export_rectangles(self):
for rects in [rects1, rects2, rects3]:
for block_size in [3, 4, 10]:
bm_uri = new_temp_file()

rect_path = new_local_temp_dir()
rect_uri = local_path_uri(rect_path)

Expand All @@ -649,7 +650,18 @@ def test_export_rectangles(self):
for (i, r) in enumerate(rects):
file = rect_path + '/rect-' + str(i) + '_' + '-'.join(map(str, r))
expected = nd[r[0]:r[1], r[2]:r[3]]
actual = np.reshape(np.loadtxt(file), (r[1] - r[0], r[3] - r[2]))
actual = np.loadtxt(file, ndmin = 2)
self._assert_eq(expected, actual)

rect_path_bytes = new_local_temp_dir()
rect_uri_bytes = local_path_uri(rect_path_bytes)

BlockMatrix.export_rectangles(bm_uri, rect_uri_bytes, rects, binary=True)

for (i, r) in enumerate(rects):
file = rect_path_bytes + '/rect-' + str(i) + '_' + '-'.join(map(str, r))
expected = nd[r[0]:r[1], r[2]:r[3]]
actual = np.reshape(np.fromfile(file), (r[1] - r[0], r[3] - r[2]))
self._assert_eq(expected, actual)

bm_uri = new_temp_file()
Expand Down
180 changes: 96 additions & 84 deletions hail/src/main/scala/is/hail/linalg/BlockMatrix.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import is.hail.io._
import is.hail.rvd.{RVD, RVDContext}
import is.hail.sparkextras.ContextRDD
import is.hail.utils._
import is.hail.utils.richUtils.RichDenseMatrixDouble
import is.hail.utils.richUtils.{RichArray, RichDenseMatrixDouble}
import org.apache.commons.lang3.StringUtils
import org.apache.commons.math3.random.MersenneTwister
import org.apache.spark.executor.InputMetrics
Expand Down Expand Up @@ -151,7 +151,8 @@ object BlockMatrix {
output: String,
flattenedRectangles: Array[Long],
delimiter: String,
nPartitions: Int): Unit = {
nPartitions: Int,
binary: Boolean): Unit = {
require(flattenedRectangles.length % 4 == 0)

checkWriteSuccess(hc, input)
Expand All @@ -171,117 +172,128 @@ object BlockMatrix {
val sb = new StringBuilder(blockSize << 2)
val paddedIndex = StringUtils.leftPad(index.toString, dRect, "0")
val outputFile = output + "/rect-" + paddedIndex + "_" + r.mkString("-")

sHadoopBc.value.value.writeFile(outputFile) { uos =>
using(
if (binary)
new DoubleOutputBuffer(uos, RichArray.defaultBufSize)
else
new OutputStreamWriter(uos)
) { os =>
val writeData: (Array[Double], Int, Boolean) => Unit =
if (binary) {
(data: Array[Double], n: Int, _) =>
os.asInstanceOf[DoubleOutputBuffer].writeDoubles(data, 0, n)
} else {
(data: Array[Double], n: Int, endLine: Boolean) =>
sb.clear()
var k = 0
while (k < n - 1) {
sb.append(data(k))
sb.append(delimiter)
k += 1
}
sb.append(data(n - 1))
if (endLine)
sb.append("\n")
else
sb.append(delimiter)
os.asInstanceOf[OutputStreamWriter].write(sb.result())
}

val osw = new OutputStreamWriter(sHadoopBc.value.value.unsafeWriter(outputFile))
try {
val startRow = r(0)
val stopRow = r(1)
val startCol = r(2)
val stopCol = r(3)
val startRow = r(0)
val stopRow = r(1)
val startCol = r(2)
val stopCol = r(3)

val nonEmpty = startRow < stopRow && startCol < stopCol
val nonEmpty = startRow < stopRow && startCol < stopCol

if (nonEmpty) {
val startRowOffset = gp.indexBlockOffset(startRow)
if (nonEmpty) {
val startRowOffset = gp.indexBlockOffset(startRow)

val startBlockCol = gp.indexBlockIndex(startCol)
val startColOffset = gp.indexBlockOffset(startCol)
val startBlockCol = gp.indexBlockIndex(startCol)
val startColOffset = gp.indexBlockOffset(startCol)

val stopBlockCol = gp.indexBlockIndex(stopCol - 1) + 1
val stopColOffset = gp.indexBlockOffset(stopCol - 1) + 1
val stopBlockCol = gp.indexBlockIndex(stopCol - 1) + 1
val stopColOffset = gp.indexBlockOffset(stopCol - 1) + 1

val startColByteOffset = startColOffset << 3
val stopColByteDeficit = (gp.blockColNCols(stopBlockCol - 1) - stopColOffset) << 3
val startColByteOffset = startColOffset << 3
val stopColByteDeficit = (gp.blockColNCols(stopBlockCol - 1) - stopColOffset) << 3

val inPerBlockCol = new Array[InputBuffer](stopBlockCol - startBlockCol)
try {
var i = startRow
while (i < stopRow) {
if (i == startRow || gp.indexBlockOffset(i) == 0) {
val blockRow = gp.indexBlockIndex(i)
val nRowsInBlock = gp.blockRowNRows(blockRow)
val inPerBlockCol = new Array[InputBuffer](stopBlockCol - startBlockCol)
try {
var i = startRow
while (i < stopRow) {
if (i == startRow || gp.indexBlockOffset(i) == 0) {
val blockRow = gp.indexBlockIndex(i)
val nRowsInBlock = gp.blockRowNRows(blockRow)

var blockCol = startBlockCol
while (blockCol < stopBlockCol) {
val pi = gp.coordinatesPart(blockRow, blockCol)
if (pi < 0)
fatal(s"block ($blockRow, $blockCol) missing for rectangle $index " +
s"with bounds ${ r.mkString("[", ", ", "]") }")
var blockCol = startBlockCol
while (blockCol < stopBlockCol) {
val pi = gp.coordinatesPart(blockRow, blockCol)
if (pi < 0)
fatal(s"block ($blockRow, $blockCol) missing for rectangle $index " +
s"with bounds ${ r.mkString("[", ", ", "]") }")

val is = sHadoopBc.value.value.unsafeReader(input + "/parts/" + partFiles(pi))
val in = BlockMatrix.bufferSpec.buildInputBuffer(is)
val is = sHadoopBc.value.value.unsafeReader(input + "/parts/" + partFiles(pi))
val in = BlockMatrix.bufferSpec.buildInputBuffer(is)

val nColsInBlock = gp.blockColNCols(blockCol)
val nColsInBlock = gp.blockColNCols(blockCol)

assert(in.readInt() == nRowsInBlock)
assert(in.readInt() == nColsInBlock)
val isTranspose = in.readBoolean()
if (!isTranspose)
fatal("BlockMatrix must be stored row major on disk in order to export rectangular regions.")
assert(in.readInt() == nRowsInBlock)
assert(in.readInt() == nColsInBlock)
val isTranspose = in.readBoolean()
if (!isTranspose)
fatal("BlockMatrix must be stored row major on disk in order to export rectangular regions.")

if (i == startRow) {
val skip = startRowOffset * (nColsInBlock << 3)
in.skipBytes(skip)
}
if (i == startRow) {
val skip = startRowOffset * (nColsInBlock << 3)
in.skipBytes(skip)
}

inPerBlockCol(blockCol - startBlockCol) = in
inPerBlockCol(blockCol - startBlockCol) = in

blockCol += 1
blockCol += 1
}
}
}

inPerBlockCol.head.skipBytes(startColByteOffset)
inPerBlockCol.head.skipBytes(startColByteOffset)

var blockCol = startBlockCol
while (blockCol < stopBlockCol) {
val startColOffsetInBlock =
if (blockCol > startBlockCol)
0
else
startColOffset
var blockCol = startBlockCol
while (blockCol < stopBlockCol) {
val startColOffsetInBlock =
if (blockCol > startBlockCol)
0
else
startColOffset

val stopColOffsetInBlock =
if (blockCol < stopBlockCol - 1)
blockSize
else
stopColOffset
val stopColOffsetInBlock =
if (blockCol < stopBlockCol - 1)
blockSize
else
stopColOffset

val n = stopColOffsetInBlock - startColOffsetInBlock
val n = stopColOffsetInBlock - startColOffsetInBlock
inPerBlockCol(blockCol - startBlockCol).readDoubles(data, 0, n)
val endLine = blockCol + 1 == stopBlockCol

inPerBlockCol(blockCol - startBlockCol).readDoubles(data, 0, n)
writeData(data, n, endLine)

sb.clear()
var k = 0
while (k < n - 1) {
sb.append(data(k))
sb.append(delimiter)
k += 1
blockCol += 1
}
sb.append(data(n - 1))
if (blockCol < stopBlockCol)
sb.append(delimiter)
else
sb.append("\n")
i += 1

osw.write(sb.result())
inPerBlockCol.last.skipBytes(stopColByteDeficit)

blockCol += 1
if (i % blockSize == 0 && i < stopRow)
inPerBlockCol.foreach(_.close())
}
i += 1

inPerBlockCol.last.skipBytes(stopColByteDeficit)

if (i % blockSize == 0 && i < stopRow)
inPerBlockCol.foreach(_.close())
} finally {
inPerBlockCol.foreach(in => if (in != null) in.close())
}
} finally {
inPerBlockCol.foreach(in => if (in != null) in.close())
}
}
} finally {
osw.close()
}

1
}

Expand Down

0 comments on commit bf5fc7d

Please sign in to comment.