Skip to content

[SPARK-9656] [MLlib] [Python] Add missing methods to PySpark's Distributed Linear Algebra Classes #9441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class IndexedRowMatrix @Since("1.0.0") (
}

/**
* Computes the Gramian matrix `A^T A`.
* Computes the Gramian matrix `A^T A`. Note that this cannot be
* computed on matrices with more than 65535 columns.
*/
@Since("1.0.0")
def computeGramianMatrix(): Matrix = {
Expand Down
32 changes: 31 additions & 1 deletion python/pyspark/mllib/linalg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@

import numpy as np

from pyspark import since
from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
IntegerType, ByteType, BooleanType


__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors',
'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices']
'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices',
'QRDecomposition']


if sys.version_info[:2] == (2, 7):
Expand Down Expand Up @@ -1235,6 +1237,34 @@ def sparse(numRows, numCols, colPtrs, rowIndices, values):
return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)


class QRDecomposition(object):
"""
.. note:: Experimental

Represents QR factors.
"""
def __init__(self, Q, R):
self._Q = Q
self._R = R

@property
@since('2.0.0')
def Q(self):
"""
An orthogonal matrix Q in a QR decomposition.
May be null if not computed.
"""
return self._Q

@property
@since('2.0.0')
def R(self):
"""
An upper triangular matrix R in a QR decomposition.
"""
return self._R


def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
Expand Down
270 changes: 268 additions & 2 deletions python/pyspark/mllib/linalg/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@

from py4j.java_gateway import JavaObject

from pyspark import RDD
from pyspark import RDD, since
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import _convert_to_vector, Matrix
from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
from pyspark.mllib.stat import MultivariateStatisticalSummary
from pyspark.storagelevel import StorageLevel


__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
Expand Down Expand Up @@ -151,6 +153,156 @@ def numCols(self):
"""
return self._java_matrix_wrapper.call("numCols")

@since('2.0.0')
def computeColumnSummaryStatistics(self):
"""
Computes column-wise summary statistics.

:return: :class:`MultivariateStatisticalSummary` object
containing column-wise summary statistics.

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)

>>> colStats = mat.computeColumnSummaryStatistics()
>>> colStats.mean()
array([ 2.5, 3.5, 4.5])
"""
java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics")
return MultivariateStatisticalSummary(java_col_stats)

@since('2.0.0')
def computeCovariance(self):
"""
Computes the covariance matrix, treating each row as an
observation. Note that this cannot be computed on matrices
with more than 65535 columns.

>>> rows = sc.parallelize([[1, 2], [2, 1]])
>>> mat = RowMatrix(rows)

>>> mat.computeCovariance()
DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
"""
return self._java_matrix_wrapper.call("computeCovariance")

@since('2.0.0')
def computeGramianMatrix(self):
"""
Computes the Gramian matrix `A^T A`. Note that this cannot be
computed on matrices with more than 65535 columns.

>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
>>> mat = RowMatrix(rows)

>>> mat.computeGramianMatrix()
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
"""
return self._java_matrix_wrapper.call("computeGramianMatrix")

@since('2.0.0')
def columnSimilarities(self, threshold=0.0):
"""
Compute similarities between columns of this matrix.

The threshold parameter is a trade-off knob between estimate
quality and computational cost.

The default threshold setting of 0 guarantees deterministically
correct results, but uses the brute-force approach of computing
normalized dot products.

Setting the threshold to positive values uses a sampling
approach and incurs strictly less computational cost than the
brute-force approach. However the similarities computed will
be estimates.

The sampling guarantees relative-error correctness for those
pairs of columns that have similarity greater than the given
similarity threshold.

To describe the guarantee, we set some notation:
* Let A be the smallest in magnitude non-zero element of
this matrix.
* Let B be the largest in magnitude non-zero element of
this matrix.
* Let L be the maximum number of non-zeros per row.

For example, for {0,1} matrices: A=B=1.
Another example, for the Netflix matrix: A=1, B=5

For those column pairs that are above the threshold, the
computed similarity is correct to within 20% relative error
with probability at least 1 - (0.981)^10/B^

The shuffle size is bounded by the *smaller* of the following
two expressions:

* O(n log(n) L / (threshold * A))
* O(m L^2^)

The latter is the cost of the brute-force approach, so for
non-zero thresholds, the cost is always cheaper than the
brute-force approach.

:param: threshold: Set to 0 for deterministic guaranteed
correctness. Similarities above this
threshold are estimated with the cost vs
estimate quality trade-off described above.
:return: An n x n sparse upper-triangular CoordinateMatrix of
cosine similarities between columns of this matrix.

>>> rows = sc.parallelize([[1, 2], [1, 5]])
>>> mat = RowMatrix(rows)

>>> sims = mat.columnSimilarities()
>>> sims.entries.first().value
0.91914503...
"""
java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold))
return CoordinateMatrix(java_sims_mat)

@since('2.0.0')
def tallSkinnyQR(self, computeQ=False):
"""
Compute the QR decomposition of this RowMatrix.

The implementation is designed to optimize the QR decomposition
(factorization) for the RowMatrix of a tall and skinny shape.

Reference:
Paul G. Constantine, David F. Gleich. "Tall and skinny QR
factorizations in MapReduce architectures"
([[http://dx.doi.org/10.1145/1996092.1996103]])

:param: computeQ: whether to computeQ
:return: QRDecomposition(Q: RowMatrix, R: Matrix), where
Q = None if computeQ = false.

>>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
>>> mat = RowMatrix(rows)
>>> decomp = mat.tallSkinnyQR(True)
>>> Q = decomp.Q
>>> R = decomp.R

>>> # Test with absolute values
>>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
>>> absQRows.collect()
[[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]

>>> # Test with absolute values
>>> abs(R.toArray()).tolist()
[[5.0, 10.0], [0.0, 1.0]]
"""
decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ))
if computeQ:
java_Q = decomp.call("Q")
Q = RowMatrix(java_Q)
else:
Q = None
R = decomp.call("R")
return QRDecomposition(Q, R)


class IndexedRow(object):
"""
Expand Down Expand Up @@ -311,6 +463,21 @@ def columnSimilarities(self):
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
return CoordinateMatrix(java_coordinate_matrix)

@since('2.0.0')
def computeGramianMatrix(self):
"""
Computes the Gramian matrix `A^T A`. Note that this cannot be
computed on matrices with more than 65535 columns.

>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows)

>>> mat.computeGramianMatrix()
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
"""
return self._java_matrix_wrapper.call("computeGramianMatrix")

def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
Expand Down Expand Up @@ -514,6 +681,26 @@ def numCols(self):
"""
return self._java_matrix_wrapper.call("numCols")

@since('2.0.0')
def transpose(self):
"""
Transpose this CoordinateMatrix.

>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> mat_transposed = mat.transpose()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this blank line intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like the visual clarity when viewing these tests on the Python docs, as it helps indicate that the following two tests rely on the data structures formed above. This is generally the pattern I've followed with these classes for cases with >1 test.

>>> print(mat_transposed.numRows())
2

>>> print(mat_transposed.numCols())
3
"""
java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
return CoordinateMatrix(java_transposed_matrix)

def toRowMatrix(self):
"""
Convert this matrix to a RowMatrix.
Expand Down Expand Up @@ -789,6 +976,33 @@ def numCols(self):
"""
return self._java_matrix_wrapper.call("numCols")

@since('2.0.0')
def cache(self):
"""
Caches the underlying RDD.
"""
self._java_matrix_wrapper.call("cache")
return self

@since('2.0.0')
def persist(self, storageLevel):
"""
Persists the underlying RDD with the specified storage level.
"""
if not isinstance(storageLevel, StorageLevel):
raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel))
javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel)
self._java_matrix_wrapper.call("persist", javaStorageLevel)
return self

@since('2.0.0')
def validate(self):
"""
Validates the block matrix info against the matrix data (`blocks`)
and throws an exception if any error is found.
"""
self._java_matrix_wrapper.call("validate")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subtract now exists on the Scale side, it can be added here.

def add(self, other):
"""
Adds two block matrices together. The matrices must have the
Expand Down Expand Up @@ -822,6 +1036,41 @@ def add(self, other):
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)

@since('2.0.0')
def subtract(self, other):
"""
Subtracts the given block matrix `other` from this block matrix:
`this - other`. The matrices must have the same size and
matching `rowsPerBlock` and `colsPerBlock` values. If one of
the sub matrix blocks that are being subtracted is a
SparseMatrix, the resulting sub matrix block will also be a
SparseMatrix, even if it is being subtracted from a DenseMatrix.
If two dense sub matrix blocks are subtracted, the output block
will also be a DenseMatrix.

>>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2])
>>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3])
>>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
>>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)])
>>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
>>> mat1 = BlockMatrix(blocks1, 3, 2)
>>> mat2 = BlockMatrix(blocks2, 3, 2)
>>> mat3 = BlockMatrix(blocks3, 3, 2)

>>> mat1.subtract(mat2).toLocalMatrix()
DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0)

>>> mat2.subtract(mat3).toLocalMatrix()
DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0)
"""
if not isinstance(other, BlockMatrix):
raise TypeError("Other should be a BlockMatrix, got %s" % type(other))

other_java_block_matrix = other._java_matrix_wrapper._java_model
java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)

def multiply(self, other):
"""
Left multiplies this BlockMatrix by `other`, another
Expand Down Expand Up @@ -857,6 +1106,23 @@ def multiply(self, other):
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)

@since('2.0.0')
def transpose(self):
"""
Transpose this BlockMatrix. Returns a new BlockMatrix
instance sharing the same underlying data. Is a lazy operation.

>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)

>>> mat_transposed = mat.transpose()
>>> mat_transposed.toLocalMatrix()
DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0)
"""
java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock)

def toLocalMatrix(self):
"""
Collect the distributed matrix on the driver as a DenseMatrix.
Expand Down