Skip to content

Commit 607f503

Browse files
dusenberrymwNick Pentreath
authored and
Nick Pentreath
committed
[SPARK-9656][MLLIB][PYTHON] Add missing methods to PySpark's Distributed Linear Algebra Classes
This PR adds the remaining group of methods to PySpark's distributed linear algebra classes as follows: * `RowMatrix` <sup>**[1]**</sup> 1. `computeGramianMatrix` 2. `computeCovariance` 3. `computeColumnSummaryStatistics` 4. `columnSimilarities` 5. `tallSkinnyQR` <sup>**[2]**</sup> * `IndexedRowMatrix` <sup>**[3]**</sup> 1. `computeGramianMatrix` * `CoordinateMatrix` 1. `transpose` * `BlockMatrix` 1. `validate` 2. `cache` 3. `persist` 4. `transpose` **[1]**: Note: `multiply`, `computeSVD`, and `computePrincipalComponents` are already part of PR #7963 for SPARK-6227. **[2]**: Implementing `tallSkinnyQR` uncovered a bug with our PySpark `RowMatrix` constructor. As discussed on the dev list [here](http://apache-spark-developers-list.1001551.n3.nabble.com/K-Means-And-Class-Tags-td10038.html), there appears to be an issue with type erasure with RDDs coming from Java, and by extension from PySpark. Although we are attempting to construct a `RowMatrix` from an `RDD[Vector]` in [PythonMLlibAPI](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala#L1115), the `Vector` type is erased, resulting in an `RDD[Object]`. Thus, when calling Scala's `tallSkinnyQR` from PySpark, we get a Java `ClassCastException` in which an `Object` cannot be cast to a Spark `Vector`. As noted in the aforementioned dev list thread, this issue was also encountered with `DecisionTrees`, and the fix involved an explicit `retag` of the RDD with a `Vector` type. Thus, this PR currently contains that fix applied to the `createRowMatrix` helper function in `PythonMLlibAPI`. `IndexedRowMatrix` and `CoordinateMatrix` do not appear to have this issue likely due to their related helper functions in `PythonMLlibAPI` creating the RDDs explicitly from DataFrames with pattern matching, thus preserving the types. However, this fix may be out of scope for this single PR, and it may be better suited in a separate JIRA/PR. Therefore, I have marked this PR as WIP and am open to discussion. **[3]**: Note: `multiply` and `computeSVD` are already part of PR #7963 for SPARK-6227. Author: Mike Dusenberry <mwdusenb@us.ibm.com> Closes #9441 from dusenberrymw/SPARK-9656_Add_Missing_Methods_to_PySpark_Distributed_Linear_Algebra.
1 parent a234cc6 commit 607f503

File tree

3 files changed

+301
-4
lines changed

3 files changed

+301
-4
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ class IndexedRowMatrix @Since("1.0.0") (
188188
}
189189

190190
/**
191-
* Computes the Gramian matrix `A^T A`.
191+
* Computes the Gramian matrix `A^T A`. Note that this cannot be
192+
* computed on matrices with more than 65535 columns.
192193
*/
193194
@Since("1.0.0")
194195
def computeGramianMatrix(): Matrix = {

python/pyspark/mllib/linalg/__init__.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@
3838

3939
import numpy as np
4040

41+
from pyspark import since
4142
from pyspark.sql.types import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
4243
IntegerType, ByteType, BooleanType
4344

4445

4546
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors',
46-
'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices']
47+
'Matrix', 'DenseMatrix', 'SparseMatrix', 'Matrices',
48+
'QRDecomposition']
4749

4850

4951
if sys.version_info[:2] == (2, 7):
@@ -1235,6 +1237,34 @@ def sparse(numRows, numCols, colPtrs, rowIndices, values):
12351237
return SparseMatrix(numRows, numCols, colPtrs, rowIndices, values)
12361238

12371239

1240+
class QRDecomposition(object):
1241+
"""
1242+
.. note:: Experimental
1243+
1244+
Represents QR factors.
1245+
"""
1246+
def __init__(self, Q, R):
1247+
self._Q = Q
1248+
self._R = R
1249+
1250+
@property
1251+
@since('2.0.0')
1252+
def Q(self):
1253+
"""
1254+
An orthogonal matrix Q in a QR decomposition.
1255+
May be null if not computed.
1256+
"""
1257+
return self._Q
1258+
1259+
@property
1260+
@since('2.0.0')
1261+
def R(self):
1262+
"""
1263+
An upper triangular matrix R in a QR decomposition.
1264+
"""
1265+
return self._R
1266+
1267+
12381268
def _test():
12391269
import doctest
12401270
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)

python/pyspark/mllib/linalg/distributed.py

Lines changed: 268 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626

2727
from py4j.java_gateway import JavaObject
2828

29-
from pyspark import RDD
29+
from pyspark import RDD, since
3030
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
31-
from pyspark.mllib.linalg import _convert_to_vector, Matrix
31+
from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition
32+
from pyspark.mllib.stat import MultivariateStatisticalSummary
33+
from pyspark.storagelevel import StorageLevel
3234

3335

3436
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow',
@@ -151,6 +153,156 @@ def numCols(self):
151153
"""
152154
return self._java_matrix_wrapper.call("numCols")
153155

156+
@since('2.0.0')
157+
def computeColumnSummaryStatistics(self):
158+
"""
159+
Computes column-wise summary statistics.
160+
161+
:return: :class:`MultivariateStatisticalSummary` object
162+
containing column-wise summary statistics.
163+
164+
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
165+
>>> mat = RowMatrix(rows)
166+
167+
>>> colStats = mat.computeColumnSummaryStatistics()
168+
>>> colStats.mean()
169+
array([ 2.5, 3.5, 4.5])
170+
"""
171+
java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics")
172+
return MultivariateStatisticalSummary(java_col_stats)
173+
174+
@since('2.0.0')
175+
def computeCovariance(self):
176+
"""
177+
Computes the covariance matrix, treating each row as an
178+
observation. Note that this cannot be computed on matrices
179+
with more than 65535 columns.
180+
181+
>>> rows = sc.parallelize([[1, 2], [2, 1]])
182+
>>> mat = RowMatrix(rows)
183+
184+
>>> mat.computeCovariance()
185+
DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0)
186+
"""
187+
return self._java_matrix_wrapper.call("computeCovariance")
188+
189+
@since('2.0.0')
190+
def computeGramianMatrix(self):
191+
"""
192+
Computes the Gramian matrix `A^T A`. Note that this cannot be
193+
computed on matrices with more than 65535 columns.
194+
195+
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]])
196+
>>> mat = RowMatrix(rows)
197+
198+
>>> mat.computeGramianMatrix()
199+
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
200+
"""
201+
return self._java_matrix_wrapper.call("computeGramianMatrix")
202+
203+
@since('2.0.0')
204+
def columnSimilarities(self, threshold=0.0):
205+
"""
206+
Compute similarities between columns of this matrix.
207+
208+
The threshold parameter is a trade-off knob between estimate
209+
quality and computational cost.
210+
211+
The default threshold setting of 0 guarantees deterministically
212+
correct results, but uses the brute-force approach of computing
213+
normalized dot products.
214+
215+
Setting the threshold to positive values uses a sampling
216+
approach and incurs strictly less computational cost than the
217+
brute-force approach. However the similarities computed will
218+
be estimates.
219+
220+
The sampling guarantees relative-error correctness for those
221+
pairs of columns that have similarity greater than the given
222+
similarity threshold.
223+
224+
To describe the guarantee, we set some notation:
225+
* Let A be the smallest in magnitude non-zero element of
226+
this matrix.
227+
* Let B be the largest in magnitude non-zero element of
228+
this matrix.
229+
* Let L be the maximum number of non-zeros per row.
230+
231+
For example, for {0,1} matrices: A=B=1.
232+
Another example, for the Netflix matrix: A=1, B=5
233+
234+
For those column pairs that are above the threshold, the
235+
computed similarity is correct to within 20% relative error
236+
with probability at least 1 - (0.981)^10/B^
237+
238+
The shuffle size is bounded by the *smaller* of the following
239+
two expressions:
240+
241+
* O(n log(n) L / (threshold * A))
242+
* O(m L^2^)
243+
244+
The latter is the cost of the brute-force approach, so for
245+
non-zero thresholds, the cost is always cheaper than the
246+
brute-force approach.
247+
248+
:param: threshold: Set to 0 for deterministic guaranteed
249+
correctness. Similarities above this
250+
threshold are estimated with the cost vs
251+
estimate quality trade-off described above.
252+
:return: An n x n sparse upper-triangular CoordinateMatrix of
253+
cosine similarities between columns of this matrix.
254+
255+
>>> rows = sc.parallelize([[1, 2], [1, 5]])
256+
>>> mat = RowMatrix(rows)
257+
258+
>>> sims = mat.columnSimilarities()
259+
>>> sims.entries.first().value
260+
0.91914503...
261+
"""
262+
java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold))
263+
return CoordinateMatrix(java_sims_mat)
264+
265+
@since('2.0.0')
266+
def tallSkinnyQR(self, computeQ=False):
267+
"""
268+
Compute the QR decomposition of this RowMatrix.
269+
270+
The implementation is designed to optimize the QR decomposition
271+
(factorization) for the RowMatrix of a tall and skinny shape.
272+
273+
Reference:
274+
Paul G. Constantine, David F. Gleich. "Tall and skinny QR
275+
factorizations in MapReduce architectures"
276+
([[http://dx.doi.org/10.1145/1996092.1996103]])
277+
278+
:param: computeQ: whether to computeQ
279+
:return: QRDecomposition(Q: RowMatrix, R: Matrix), where
280+
Q = None if computeQ = false.
281+
282+
>>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]])
283+
>>> mat = RowMatrix(rows)
284+
>>> decomp = mat.tallSkinnyQR(True)
285+
>>> Q = decomp.Q
286+
>>> R = decomp.R
287+
288+
>>> # Test with absolute values
289+
>>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist())
290+
>>> absQRows.collect()
291+
[[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]]
292+
293+
>>> # Test with absolute values
294+
>>> abs(R.toArray()).tolist()
295+
[[5.0, 10.0], [0.0, 1.0]]
296+
"""
297+
decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ))
298+
if computeQ:
299+
java_Q = decomp.call("Q")
300+
Q = RowMatrix(java_Q)
301+
else:
302+
Q = None
303+
R = decomp.call("R")
304+
return QRDecomposition(Q, R)
305+
154306

155307
class IndexedRow(object):
156308
"""
@@ -311,6 +463,21 @@ def columnSimilarities(self):
311463
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities")
312464
return CoordinateMatrix(java_coordinate_matrix)
313465

466+
@since('2.0.0')
467+
def computeGramianMatrix(self):
468+
"""
469+
Computes the Gramian matrix `A^T A`. Note that this cannot be
470+
computed on matrices with more than 65535 columns.
471+
472+
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
473+
... IndexedRow(1, [4, 5, 6])])
474+
>>> mat = IndexedRowMatrix(rows)
475+
476+
>>> mat.computeGramianMatrix()
477+
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0)
478+
"""
479+
return self._java_matrix_wrapper.call("computeGramianMatrix")
480+
314481
def toRowMatrix(self):
315482
"""
316483
Convert this matrix to a RowMatrix.
@@ -514,6 +681,26 @@ def numCols(self):
514681
"""
515682
return self._java_matrix_wrapper.call("numCols")
516683

684+
@since('2.0.0')
685+
def transpose(self):
686+
"""
687+
Transpose this CoordinateMatrix.
688+
689+
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
690+
... MatrixEntry(1, 0, 2),
691+
... MatrixEntry(2, 1, 3.7)])
692+
>>> mat = CoordinateMatrix(entries)
693+
>>> mat_transposed = mat.transpose()
694+
695+
>>> print(mat_transposed.numRows())
696+
2
697+
698+
>>> print(mat_transposed.numCols())
699+
3
700+
"""
701+
java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
702+
return CoordinateMatrix(java_transposed_matrix)
703+
517704
def toRowMatrix(self):
518705
"""
519706
Convert this matrix to a RowMatrix.
@@ -789,6 +976,33 @@ def numCols(self):
789976
"""
790977
return self._java_matrix_wrapper.call("numCols")
791978

979+
@since('2.0.0')
980+
def cache(self):
981+
"""
982+
Caches the underlying RDD.
983+
"""
984+
self._java_matrix_wrapper.call("cache")
985+
return self
986+
987+
@since('2.0.0')
988+
def persist(self, storageLevel):
989+
"""
990+
Persists the underlying RDD with the specified storage level.
991+
"""
992+
if not isinstance(storageLevel, StorageLevel):
993+
raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel))
994+
javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel)
995+
self._java_matrix_wrapper.call("persist", javaStorageLevel)
996+
return self
997+
998+
@since('2.0.0')
999+
def validate(self):
1000+
"""
1001+
Validates the block matrix info against the matrix data (`blocks`)
1002+
and throws an exception if any error is found.
1003+
"""
1004+
self._java_matrix_wrapper.call("validate")
1005+
7921006
def add(self, other):
7931007
"""
7941008
Adds two block matrices together. The matrices must have the
@@ -822,6 +1036,41 @@ def add(self, other):
8221036
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix)
8231037
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
8241038

1039+
@since('2.0.0')
1040+
def subtract(self, other):
1041+
"""
1042+
Subtracts the given block matrix `other` from this block matrix:
1043+
`this - other`. The matrices must have the same size and
1044+
matching `rowsPerBlock` and `colsPerBlock` values. If one of
1045+
the sub matrix blocks that are being subtracted is a
1046+
SparseMatrix, the resulting sub matrix block will also be a
1047+
SparseMatrix, even if it is being subtracted from a DenseMatrix.
1048+
If two dense sub matrix blocks are subtracted, the output block
1049+
will also be a DenseMatrix.
1050+
1051+
>>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2])
1052+
>>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12])
1053+
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3])
1054+
>>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)])
1055+
>>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)])
1056+
>>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)])
1057+
>>> mat1 = BlockMatrix(blocks1, 3, 2)
1058+
>>> mat2 = BlockMatrix(blocks2, 3, 2)
1059+
>>> mat3 = BlockMatrix(blocks3, 3, 2)
1060+
1061+
>>> mat1.subtract(mat2).toLocalMatrix()
1062+
DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0)
1063+
1064+
>>> mat2.subtract(mat3).toLocalMatrix()
1065+
DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0)
1066+
"""
1067+
if not isinstance(other, BlockMatrix):
1068+
raise TypeError("Other should be a BlockMatrix, got %s" % type(other))
1069+
1070+
other_java_block_matrix = other._java_matrix_wrapper._java_model
1071+
java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix)
1072+
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
1073+
8251074
def multiply(self, other):
8261075
"""
8271076
Left multiplies this BlockMatrix by `other`, another
@@ -857,6 +1106,23 @@ def multiply(self, other):
8571106
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix)
8581107
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock)
8591108

1109+
@since('2.0.0')
1110+
def transpose(self):
1111+
"""
1112+
Transpose this BlockMatrix. Returns a new BlockMatrix
1113+
instance sharing the same underlying data. Is a lazy operation.
1114+
1115+
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
1116+
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
1117+
>>> mat = BlockMatrix(blocks, 3, 2)
1118+
1119+
>>> mat_transposed = mat.transpose()
1120+
>>> mat_transposed.toLocalMatrix()
1121+
DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0)
1122+
"""
1123+
java_transposed_matrix = self._java_matrix_wrapper.call("transpose")
1124+
return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock)
1125+
8601126
def toLocalMatrix(self):
8611127
"""
8621128
Collect the distributed matrix on the driver as a DenseMatrix.

0 commit comments

Comments
 (0)