Skip to content

Commit

Permalink
convert some matrix operations to IR (hail-is#5018)
Browse files Browse the repository at this point in the history
* convert some matrix operations to IR

* a bit more cleanup, fixed tests

* fixed bug
  • Loading branch information
cseed authored and danking committed Dec 20, 2018
1 parent 110f78f commit dfcabf9
Show file tree
Hide file tree
Showing 14 changed files with 144 additions and 159 deletions.
6 changes: 3 additions & 3 deletions hail/python/hail/ir/matrix_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,14 @@ def render(self, r):
r(self.child))

class MatrixRepartition(MatrixIR):
def __init__(self, child, n, shuffle):
def __init__(self, child, n, strategy):
super().__init__()
self.child = child
self.n = n
self.shuffle = shuffle
self.strategy = strategy

def render(self, r):
return f'(MatrixRepartition {r(self.child)} {self.n} {self.shuffle})'
return f'(MatrixRepartition {r(self.child)} {self.n} {self.strategy})'


class MatrixUnionRows(MatrixIR):
Expand Down
10 changes: 7 additions & 3 deletions hail/python/hail/ir/table_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,20 @@ def __init__(self, child):
def render(self, r):
return f'(TableDistinct {r(self.child)})'

class RepartitionStrategy:
SHUFFLE = 0
COALESCE = 1
NAIVE_COALESCE = 2

class TableRepartition(TableIR):
def __init__(self, child, n, shuffle):
def __init__(self, child, n, strategy):
super().__init__()
self.child = child
self.n = n
self.shuffle = shuffle
self.strategy = strategy

def render(self, r):
return f'(TableRepartition {self.n} {self.shuffle} {r(self.child)})'
return f'(TableRepartition {self.n} {self.strategy} {r(self.child)})'


class CastMatrixToTable(TableIR):
Expand Down
48 changes: 28 additions & 20 deletions hail/python/hail/matrixtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,7 +2176,8 @@ def globals_table(self) -> Table:
:class:`.Table`
Table with the globals from the matrix, with a single row.
"""
return Table._from_java(self._jmt.globalsTable())
return Table.parallelize(
[hl.eval(self.globals)], self._global_type)

def rows(self) -> Table:
"""Returns a table with all row fields in the matrix.
Expand Down Expand Up @@ -2354,7 +2355,8 @@ def index_rows(self, *exprs):

if is_row_key:
def joiner(left):
return MatrixTable._from_java(left._jmt.annotateRowsVDS(right._jmt, uid))
return MatrixTable(MatrixAnnotateRowsTable(
left._mir, right.rows()._tir, uid, None))
schema = tstruct(**{f: t for f, t in self.row.dtype.items() if f not in self.row_key})
ir = Join(GetField(TopLevelReference('va'), uid),
uids_to_delete,
Expand Down Expand Up @@ -2510,7 +2512,8 @@ def joiner(left: MatrixTable):

@typecheck_method(entries_field_name=str, cols_field_name=str)
def _localize_entries(self, entries_field_name, cols_field_name):
return Table._from_java(self._jmt.localizeEntries(entries_field_name, cols_field_name))
return Table(CastMatrixToTable(
self._mir, entries_field_name, cols_field_name))

@typecheck_method(row_exprs=dictof(str, expr_any),
col_exprs=dictof(str, expr_any),
Expand Down Expand Up @@ -2709,7 +2712,7 @@ def n_partitions(self) -> int:
@typecheck_method(n_partitions=int,
shuffle=bool)
def repartition(self, n_partitions: int, shuffle: bool = True) -> 'MatrixTable':
"""Increase or decrease the number of partitions.
"""Change the number of partitions.
Examples
--------
Expand All @@ -2734,17 +2737,15 @@ def repartition(self, n_partitions: int, shuffle: bool = True) -> 'MatrixTable':
can allow one to take advantage of more cores. Partitions are a core
concept of distributed computation in Spark, see `their documentation
<http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds>`__
for details. With ``shuffle=True``, Hail does a full shuffle of the data
and creates equal sized partitions. With ``shuffle=False``, Hail
combines existing partitions to avoid a full shuffle. These algorithms
correspond to the `repartition` and `coalesce` commands in Spark,
respectively. In particular, when ``shuffle=False``, ``n_partitions``
cannot exceed current number of partitions.
for details.
Note
----
If `shuffle` is ``False``, the number of partitions may only be
reduced, not increased.
When ``shuffle=True``, Hail does a full shuffle of the data
and creates equal sized partitions. When ``shuffle=False``,
Hail combines existing partitions to avoid a full
shuffle. These algorithms correspond to the `repartition` and
`coalesce` commands in Spark, respectively. In particular,
when ``shuffle=False``, ``n_partitions`` cannot exceed current
number of partitions.
Parameters
----------
Expand All @@ -2758,7 +2759,10 @@ def repartition(self, n_partitions: int, shuffle: bool = True) -> 'MatrixTable':
:class:`.MatrixTable`
Repartitioned dataset.
"""
return MatrixTable(MatrixRepartition(self._mir, n_partitions, shuffle))

return MatrixTable(MatrixRepartition(
self._mir, n_partitions,
RepartitionStrategy.SHUFFLE if shuffle else RepartitionStrategy.COALESCE))

@typecheck_method(max_partitions=int)
def naive_coalesce(self, max_partitions: int) -> 'MatrixTable':
Expand Down Expand Up @@ -2789,7 +2793,9 @@ def naive_coalesce(self, max_partitions: int) -> 'MatrixTable':
:class:`.MatrixTable`
Matrix table with at most `max_partitions` partitions.
"""
return MatrixTable._from_java(self._jmt.naiveCoalesce(max_partitions))

return MatrixTable(MatrixRepartition(
self._mir, max_partitions, RepartitionStrategy.NAIVE_COALESCE))

def cache(self) -> 'MatrixTable':
"""Persist the dataset in memory.
Expand Down Expand Up @@ -3164,9 +3170,11 @@ def from_rows_table(cls, table: Table) -> 'MatrixTable':
-------
:class:`.MatrixTable`
"""
hail.methods.misc.require_key(table, 'from_rows_table')
jmt = scala_object(Env.hail().variant, 'MatrixTable').fromRowsTable(table._jt)
return MatrixTable._from_java(jmt)
col_values_uid = Env.get_uid()
entries_uid = Env.get_uid()
return (table.annotate_globals(**{col_values_uid: hl.empty_array(hl.tstruct())})
.annotate(**{entries_uid: hl.empty_array(hl.tstruct())})
._unlocalize_entries(entries_uid, col_values_uid, []))

@typecheck_method(p=numeric,
seed=nullable(int))
Expand Down Expand Up @@ -3259,7 +3267,7 @@ def distinct_by_row(self):
-------
:class:`.MatrixTable`
"""
return MatrixTable._from_java(self._jmt.distinctByRow())
return MatrixTable(MatrixDistinctByRow(self._mir))

def distinct_by_col(self):
"""Remove columns with a duplicate row key.
Expand Down
77 changes: 65 additions & 12 deletions hail/python/hail/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1772,36 +1772,89 @@ def sample(self, p, seed=None):
@typecheck_method(n=int,
shuffle=bool)
def repartition(self, n, shuffle=True):
"""Change the number of distributed partitions.
"""Change the number of partitions.
Examples
--------
Repartition to 10 partitions:
>>> table_result = table1.repartition(10)
Repartition to 500 partitions:
Warning
-------
When `shuffle` is ``False``, `repartition` can only decrease the number
of partitions and simply combines adjacent partitions to achieve the
desired number. It does not attempt to rebalance and so can produce a
heavily unbalanced dataset. An unbalanced dataset can be inefficient to
operate on because the work is not evenly distributed across partitions.
>>> table_result = table1.repartition(500)
Notes
-----
Check the current number of partitions with :meth:`.n_partitions`.
The data in a dataset is divided into chunks called partitions, which
may be stored together or across a network, so that each partition may
be read and processed in parallel by available cores. When a table with
:math:`M` rows is first imported, each of the :math:`k` partitions will
contain about :math:`M/k` of the rows. Since each partition has some
computational overhead, decreasing the number of partitions can improve
performance after significant filtering. Since it's recommended to have
at least 2 - 4 partitions per core, increasing the number of partitions
can allow one to take advantage of more cores. Partitions are a core
concept of distributed computation in Spark, see `their documentation
<http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds>`__
for details.
When ``shuffle=True``, Hail does a full shuffle of the data
and creates equal sized partitions. When ``shuffle=False``,
Hail combines existing partitions to avoid a full shuffle.
These algorithms correspond to the `repartition` and
`coalesce` commands in Spark, respectively. In particular,
when ``shuffle=False``, ``n_partitions`` cannot exceed current
number of partitions.
Parameters
----------
n : int
Desired number of partitions.
shuffle : bool
If ``True``, shuffle data. Otherwise, naively coalesce.
If ``True``, use full shuffle to repartition.
Returns
-------
:class:`.Table`
Repartitioned table.
"""

return Table(TableRepartition(self._tir, n, shuffle))
return Table(TableRepartition(
self._tir, n, RepartitionStrategy.NAIVE_COALESCE))

@typecheck_method(max_partitions=int)
def naive_coalesce(self, max_partitions: int) -> 'MatrixTable':
"""Naively decrease the number of partitions.
Example
-------
Naively repartition to 10 partitions:
>>> table_result = table1.naive_coalesce(10)
Warning
-------
:meth:`.naive_coalesce` simply combines adjacent partitions to achieve
the desired number. It does not attempt to rebalance, unlike
:meth:`.repartition`, so it can produce a heavily unbalanced dataset. An
unbalanced dataset can be inefficient to operate on because the work is
not evenly distributed across partitions.
Parameters
----------
max_partitions : int
Desired number of partitions. If the current number of partitions is
less than or equal to `max_partitions`, do nothing.
Returns
-------
:class:`.Table`
Table with at most `max_partitions` partitions.
"""

return Table(TableRepartition(
self._tir, max_partitions, RepartitionStrategy.NAIVE_COALESCE))

@typecheck_method(right=table_type,
how=enumeration('inner', 'outer', 'left', 'right'),
Expand Down
4 changes: 2 additions & 2 deletions hail/python/test/hail/test_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def table_irs(self):
ir.MakeStruct([
('foo', ir.NA(hl.tarray(hl.tint32)))])),
ir.TableRange(100, 10),
ir.TableRepartition(table_read, 10, False),
ir.TableRepartition(table_read, 10, ir.RepartitionStrategy.COALESCE),
ir.TableUnion(
[ir.TableRange(100, 10), ir.TableRange(50, 10)]),
ir.TableExplode(table_read, 'mset'),
Expand Down Expand Up @@ -186,7 +186,7 @@ def test_matrix_ir_parses(self):
matrix_range = ir.MatrixRead(ir.MatrixRangeReader(1, 1, 10))

matrix_irs = [
ir.MatrixRepartition(matrix_range, 100, True),
ir.MatrixRepartition(matrix_range, 100, ir.RepartitionStrategy.SHUFFLE),
ir.MatrixUnionRows(matrix_range, matrix_range),
ir.MatrixDistinctByRow(matrix_range),
ir.CastTableToMatrix(
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/MatrixIR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2083,14 +2083,14 @@ case class MatrixExplodeRows(child: MatrixIR, path: IndexedSeq[String]) extends
}
}

case class MatrixRepartition(child: MatrixIR, n: Int, shuffle: Boolean) extends MatrixIR {
case class MatrixRepartition(child: MatrixIR, n: Int, strategy: Int) extends MatrixIR {
val typ: MatrixType = child.typ

def children: IndexedSeq[BaseIR] = FastIndexedSeq(child)

def copy(newChildren: IndexedSeq[BaseIR]): MatrixRepartition = {
val IndexedSeq(newChild: MatrixIR) = newChildren
MatrixRepartition(newChild, n, shuffle)
MatrixRepartition(newChild, n, strategy)
}

override def columnCount: Option[Int] = child.columnCount
Expand Down
8 changes: 4 additions & 4 deletions hail/src/main/scala/is/hail/expr/ir/Parser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -820,9 +820,9 @@ object IRParser {
TableKeyByAndAggregate(child, expr, newKey, nPartitions, bufferSize)
case "TableRepartition" =>
val n = int32_literal(it)
val shuffle = boolean_literal(it)
val strategy = int32_literal(it)
val child = table_ir(env)(it)
TableRepartition(child, n, shuffle)
TableRepartition(child, n, strategy)
case "TableHead" =>
val n = int64_literal(it)
val child = table_ir(env)(it)
Expand Down Expand Up @@ -997,8 +997,8 @@ object IRParser {
case "MatrixRepartition" =>
val child = matrix_ir(env)(it)
val n = int32_literal(it)
val shuffle = boolean_literal(it)
MatrixRepartition(child, n, shuffle)
val strategy = int32_literal(it)
MatrixRepartition(child, n, strategy)
case "MatrixUnionRows" =>
val children = matrix_ir_children(env)(it)
MatrixUnionRows(children)
Expand Down
4 changes: 2 additions & 2 deletions hail/src/main/scala/is/hail/expr/ir/Pretty.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ object Pretty {
prettyStringLiteral(uid)
case MatrixExplodeRows(_, path) => prettyIdentifiers(path)
case MatrixExplodeCols(_, path) => prettyIdentifiers(path)
case MatrixRepartition(_, n, shuffle) => n.toString + " " + prettyBooleanLiteral(shuffle)
case MatrixRepartition(_, n, strategy) => s"$n $strategy"
case MatrixChooseCols(_, oldIndices) => prettyInts(oldIndices)
case MatrixMapCols(_, _, newKey) => prettyStringsOpt(newKey)
case MatrixKeyRowsBy(_, keys, isSorted) =>
Expand Down Expand Up @@ -260,7 +260,7 @@ object Pretty {
prettyIdentifiers(keys) + " " +
prettyBooleanLiteral(isSorted)
case TableRange(n, nPartitions) => s"$n $nPartitions"
case TableRepartition(_, n, shuffle) => n.toString + " " + prettyBooleanLiteral(shuffle)
case TableRepartition(_, n, strategy) => s"$n $strategy"
case TableHead(_, n) => n.toString
case TableJoin(_, _, joinType, joinKey) => s"$joinType $joinKey"
case TableLeftJoinRightDistinct(_, _, root) => prettyIdentifier(root)
Expand Down
20 changes: 16 additions & 4 deletions hail/src/main/scala/is/hail/expr/ir/TableIR.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,17 @@ case class TableHead(child: TableIR, n: Long) extends TableIR {
}
}

case class TableRepartition(child: TableIR, n: Int, shuffle: Boolean) extends TableIR {
object RepartitionStrategy {
val SHUFFLE: Int = 0
val COALESCE: Int = 1
val NAIVE_COALESCE: Int = 2
}

case class TableRepartition(child: TableIR, n: Int, strategy: Int) extends TableIR {
def typ: TableType = child.typ

override lazy val rvdType: RVDType = {
if (shuffle)
if (strategy == RepartitionStrategy.SHUFFLE)
typ.canonicalRVDType
else
child.rvdType
Expand All @@ -361,12 +367,18 @@ case class TableRepartition(child: TableIR, n: Int, shuffle: Boolean) extends Ta

def copy(newChildren: IndexedSeq[BaseIR]): TableRepartition = {
val IndexedSeq(newChild: TableIR) = newChildren
TableRepartition(newChild, n, shuffle)
TableRepartition(newChild, n, strategy)
}

protected[ir] override def execute(hc: HailContext): TableValue = {
val prev = child.execute(hc)
prev.copy(rvd = prev.rvd.coalesce(n, shuffle))
val rvd = strategy match {
case RepartitionStrategy.SHUFFLE => prev.rvd.coalesce(n, shuffle = true)
case RepartitionStrategy.COALESCE => prev.rvd.coalesce(n, shuffle = false)
case RepartitionStrategy.NAIVE_COALESCE => prev.rvd.naiveCoalesce(n)
}

prev.copy(rvd = rvd)
}
}

Expand Down
Loading

0 comments on commit dfcabf9

Please sign in to comment.