-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-9656] [MLlib] [Python] Add missing methods to PySpark's Distributed Linear Algebra Classes #9441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
dusenberrymw
wants to merge
8
commits into
apache:master
from
dusenberrymw:SPARK-9656_Add_Missing_Methods_to_PySpark_Distributed_Linear_Algebra
Closed
[SPARK-9656] [MLlib] [Python] Add missing methods to PySpark's Distributed Linear Algebra Classes #9441
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f503890
Adding remaining methods to PySpark BlockMatrix: cache, persist, vali…
dusenberrymw edf3e45
Adding remaining method to PySpark CoordinateMatrix: transpose.
dusenberrymw e153279
Adding remaining method to PySpark IndexedRowMatrix: computeGramianMa…
dusenberrymw 587bea5
Adding remaining methods to PySpark RowMatrix: computeGramianMatrix, …
dusenberrymw 12baa78
Improving robustness of PySpark test for Python 2.6.
dusenberrymw fca41ca
Adding experimental tag to QRDecomposition.
dusenberrymw f1410bf
Adding @since annotations, and adding a comment to Scala.
dusenberrymw c0c9565
Adding the subtract method.
dusenberrymw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,9 +26,11 @@ | |
|
||
from py4j.java_gateway import JavaObject | ||
|
||
from pyspark import RDD | ||
from pyspark import RDD, since | ||
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper | ||
from pyspark.mllib.linalg import _convert_to_vector, Matrix | ||
from pyspark.mllib.linalg import _convert_to_vector, Matrix, QRDecomposition | ||
from pyspark.mllib.stat import MultivariateStatisticalSummary | ||
from pyspark.storagelevel import StorageLevel | ||
|
||
|
||
__all__ = ['DistributedMatrix', 'RowMatrix', 'IndexedRow', | ||
|
@@ -151,6 +153,156 @@ def numCols(self): | |
""" | ||
return self._java_matrix_wrapper.call("numCols") | ||
|
||
@since('2.0.0') | ||
def computeColumnSummaryStatistics(self): | ||
""" | ||
Computes column-wise summary statistics. | ||
|
||
:return: :class:`MultivariateStatisticalSummary` object | ||
containing column-wise summary statistics. | ||
|
||
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) | ||
>>> mat = RowMatrix(rows) | ||
|
||
>>> colStats = mat.computeColumnSummaryStatistics() | ||
>>> colStats.mean() | ||
array([ 2.5, 3.5, 4.5]) | ||
""" | ||
java_col_stats = self._java_matrix_wrapper.call("computeColumnSummaryStatistics") | ||
return MultivariateStatisticalSummary(java_col_stats) | ||
|
||
@since('2.0.0') | ||
def computeCovariance(self): | ||
""" | ||
Computes the covariance matrix, treating each row as an | ||
observation. Note that this cannot be computed on matrices | ||
with more than 65535 columns. | ||
|
||
>>> rows = sc.parallelize([[1, 2], [2, 1]]) | ||
>>> mat = RowMatrix(rows) | ||
|
||
>>> mat.computeCovariance() | ||
DenseMatrix(2, 2, [0.5, -0.5, -0.5, 0.5], 0) | ||
""" | ||
return self._java_matrix_wrapper.call("computeCovariance") | ||
|
||
@since('2.0.0') | ||
def computeGramianMatrix(self): | ||
""" | ||
Computes the Gramian matrix `A^T A`. Note that this cannot be | ||
computed on matrices with more than 65535 columns. | ||
|
||
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6]]) | ||
>>> mat = RowMatrix(rows) | ||
|
||
>>> mat.computeGramianMatrix() | ||
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0) | ||
""" | ||
return self._java_matrix_wrapper.call("computeGramianMatrix") | ||
|
||
@since('2.0.0') | ||
def columnSimilarities(self, threshold=0.0): | ||
""" | ||
Compute similarities between columns of this matrix. | ||
|
||
The threshold parameter is a trade-off knob between estimate | ||
quality and computational cost. | ||
|
||
The default threshold setting of 0 guarantees deterministically | ||
correct results, but uses the brute-force approach of computing | ||
normalized dot products. | ||
|
||
Setting the threshold to positive values uses a sampling | ||
approach and incurs strictly less computational cost than the | ||
brute-force approach. However the similarities computed will | ||
be estimates. | ||
|
||
The sampling guarantees relative-error correctness for those | ||
pairs of columns that have similarity greater than the given | ||
similarity threshold. | ||
|
||
To describe the guarantee, we set some notation: | ||
* Let A be the smallest in magnitude non-zero element of | ||
this matrix. | ||
* Let B be the largest in magnitude non-zero element of | ||
this matrix. | ||
* Let L be the maximum number of non-zeros per row. | ||
|
||
For example, for {0,1} matrices: A=B=1. | ||
Another example, for the Netflix matrix: A=1, B=5 | ||
|
||
For those column pairs that are above the threshold, the | ||
computed similarity is correct to within 20% relative error | ||
with probability at least 1 - (0.981)^10/B^ | ||
|
||
The shuffle size is bounded by the *smaller* of the following | ||
two expressions: | ||
|
||
* O(n log(n) L / (threshold * A)) | ||
* O(m L^2^) | ||
|
||
The latter is the cost of the brute-force approach, so for | ||
non-zero thresholds, the cost is always cheaper than the | ||
brute-force approach. | ||
|
||
:param: threshold: Set to 0 for deterministic guaranteed | ||
correctness. Similarities above this | ||
threshold are estimated with the cost vs | ||
estimate quality trade-off described above. | ||
:return: An n x n sparse upper-triangular CoordinateMatrix of | ||
cosine similarities between columns of this matrix. | ||
|
||
>>> rows = sc.parallelize([[1, 2], [1, 5]]) | ||
>>> mat = RowMatrix(rows) | ||
|
||
>>> sims = mat.columnSimilarities() | ||
>>> sims.entries.first().value | ||
0.91914503... | ||
""" | ||
java_sims_mat = self._java_matrix_wrapper.call("columnSimilarities", float(threshold)) | ||
return CoordinateMatrix(java_sims_mat) | ||
|
||
@since('2.0.0') | ||
def tallSkinnyQR(self, computeQ=False): | ||
""" | ||
Compute the QR decomposition of this RowMatrix. | ||
|
||
The implementation is designed to optimize the QR decomposition | ||
(factorization) for the RowMatrix of a tall and skinny shape. | ||
|
||
Reference: | ||
Paul G. Constantine, David F. Gleich. "Tall and skinny QR | ||
factorizations in MapReduce architectures" | ||
([[http://dx.doi.org/10.1145/1996092.1996103]]) | ||
|
||
:param: computeQ: whether to computeQ | ||
:return: QRDecomposition(Q: RowMatrix, R: Matrix), where | ||
Q = None if computeQ = false. | ||
|
||
>>> rows = sc.parallelize([[3, -6], [4, -8], [0, 1]]) | ||
>>> mat = RowMatrix(rows) | ||
>>> decomp = mat.tallSkinnyQR(True) | ||
>>> Q = decomp.Q | ||
>>> R = decomp.R | ||
|
||
>>> # Test with absolute values | ||
>>> absQRows = Q.rows.map(lambda row: abs(row.toArray()).tolist()) | ||
>>> absQRows.collect() | ||
[[0.6..., 0.0], [0.8..., 0.0], [0.0, 1.0]] | ||
|
||
>>> # Test with absolute values | ||
>>> abs(R.toArray()).tolist() | ||
[[5.0, 10.0], [0.0, 1.0]] | ||
""" | ||
decomp = JavaModelWrapper(self._java_matrix_wrapper.call("tallSkinnyQR", computeQ)) | ||
if computeQ: | ||
java_Q = decomp.call("Q") | ||
Q = RowMatrix(java_Q) | ||
else: | ||
Q = None | ||
R = decomp.call("R") | ||
return QRDecomposition(Q, R) | ||
|
||
|
||
class IndexedRow(object): | ||
""" | ||
|
@@ -311,6 +463,21 @@ def columnSimilarities(self): | |
java_coordinate_matrix = self._java_matrix_wrapper.call("columnSimilarities") | ||
return CoordinateMatrix(java_coordinate_matrix) | ||
|
||
@since('2.0.0') | ||
def computeGramianMatrix(self): | ||
""" | ||
Computes the Gramian matrix `A^T A`. Note that this cannot be | ||
computed on matrices with more than 65535 columns. | ||
|
||
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]), | ||
... IndexedRow(1, [4, 5, 6])]) | ||
>>> mat = IndexedRowMatrix(rows) | ||
|
||
>>> mat.computeGramianMatrix() | ||
DenseMatrix(3, 3, [17.0, 22.0, 27.0, 22.0, 29.0, 36.0, 27.0, 36.0, 45.0], 0) | ||
""" | ||
return self._java_matrix_wrapper.call("computeGramianMatrix") | ||
|
||
def toRowMatrix(self): | ||
""" | ||
Convert this matrix to a RowMatrix. | ||
|
@@ -514,6 +681,26 @@ def numCols(self): | |
""" | ||
return self._java_matrix_wrapper.call("numCols") | ||
|
||
@since('2.0.0') | ||
def transpose(self): | ||
""" | ||
Transpose this CoordinateMatrix. | ||
|
||
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2), | ||
... MatrixEntry(1, 0, 2), | ||
... MatrixEntry(2, 1, 3.7)]) | ||
>>> mat = CoordinateMatrix(entries) | ||
>>> mat_transposed = mat.transpose() | ||
|
||
>>> print(mat_transposed.numRows()) | ||
2 | ||
|
||
>>> print(mat_transposed.numCols()) | ||
3 | ||
""" | ||
java_transposed_matrix = self._java_matrix_wrapper.call("transpose") | ||
return CoordinateMatrix(java_transposed_matrix) | ||
|
||
def toRowMatrix(self): | ||
""" | ||
Convert this matrix to a RowMatrix. | ||
|
@@ -789,6 +976,33 @@ def numCols(self): | |
""" | ||
return self._java_matrix_wrapper.call("numCols") | ||
|
||
@since('2.0.0') | ||
def cache(self): | ||
""" | ||
Caches the underlying RDD. | ||
""" | ||
self._java_matrix_wrapper.call("cache") | ||
return self | ||
|
||
@since('2.0.0') | ||
def persist(self, storageLevel): | ||
""" | ||
Persists the underlying RDD with the specified storage level. | ||
""" | ||
if not isinstance(storageLevel, StorageLevel): | ||
raise TypeError("`storageLevel` should be a StorageLevel, got %s" % type(storageLevel)) | ||
javaStorageLevel = self._java_matrix_wrapper._sc._getJavaStorageLevel(storageLevel) | ||
self._java_matrix_wrapper.call("persist", javaStorageLevel) | ||
return self | ||
|
||
@since('2.0.0') | ||
def validate(self): | ||
""" | ||
Validates the block matrix info against the matrix data (`blocks`) | ||
and throws an exception if any error is found. | ||
""" | ||
self._java_matrix_wrapper.call("validate") | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
def add(self, other): | ||
""" | ||
Adds two block matrices together. The matrices must have the | ||
|
@@ -822,6 +1036,41 @@ def add(self, other): | |
java_block_matrix = self._java_matrix_wrapper.call("add", other_java_block_matrix) | ||
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) | ||
|
||
@since('2.0.0') | ||
def subtract(self, other): | ||
""" | ||
Subtracts the given block matrix `other` from this block matrix: | ||
`this - other`. The matrices must have the same size and | ||
matching `rowsPerBlock` and `colsPerBlock` values. If one of | ||
the sub matrix blocks that are being subtracted is a | ||
SparseMatrix, the resulting sub matrix block will also be a | ||
SparseMatrix, even if it is being subtracted from a DenseMatrix. | ||
If two dense sub matrix blocks are subtracted, the output block | ||
will also be a DenseMatrix. | ||
|
||
>>> dm1 = Matrices.dense(3, 2, [3, 1, 5, 4, 6, 2]) | ||
>>> dm2 = Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]) | ||
>>> sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 1, 2], [1, 2, 3]) | ||
>>> blocks1 = sc.parallelize([((0, 0), dm1), ((1, 0), dm2)]) | ||
>>> blocks2 = sc.parallelize([((0, 0), dm2), ((1, 0), dm1)]) | ||
>>> blocks3 = sc.parallelize([((0, 0), sm), ((1, 0), dm2)]) | ||
>>> mat1 = BlockMatrix(blocks1, 3, 2) | ||
>>> mat2 = BlockMatrix(blocks2, 3, 2) | ||
>>> mat3 = BlockMatrix(blocks3, 3, 2) | ||
|
||
>>> mat1.subtract(mat2).toLocalMatrix() | ||
DenseMatrix(6, 2, [-4.0, -7.0, -4.0, 4.0, 7.0, 4.0, -6.0, -5.0, -10.0, 6.0, 5.0, 10.0], 0) | ||
|
||
>>> mat2.subtract(mat3).toLocalMatrix() | ||
DenseMatrix(6, 2, [6.0, 8.0, 9.0, -4.0, -7.0, -4.0, 10.0, 9.0, 9.0, -6.0, -5.0, -10.0], 0) | ||
""" | ||
if not isinstance(other, BlockMatrix): | ||
raise TypeError("Other should be a BlockMatrix, got %s" % type(other)) | ||
|
||
other_java_block_matrix = other._java_matrix_wrapper._java_model | ||
java_block_matrix = self._java_matrix_wrapper.call("subtract", other_java_block_matrix) | ||
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) | ||
|
||
def multiply(self, other): | ||
""" | ||
Left multiplies this BlockMatrix by `other`, another | ||
|
@@ -857,6 +1106,23 @@ def multiply(self, other): | |
java_block_matrix = self._java_matrix_wrapper.call("multiply", other_java_block_matrix) | ||
return BlockMatrix(java_block_matrix, self.rowsPerBlock, self.colsPerBlock) | ||
|
||
@since('2.0.0') | ||
def transpose(self): | ||
""" | ||
Transpose this BlockMatrix. Returns a new BlockMatrix | ||
instance sharing the same underlying data. Is a lazy operation. | ||
|
||
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])), | ||
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]) | ||
>>> mat = BlockMatrix(blocks, 3, 2) | ||
|
||
>>> mat_transposed = mat.transpose() | ||
>>> mat_transposed.toLocalMatrix() | ||
DenseMatrix(2, 6, [1.0, 4.0, 2.0, 5.0, 3.0, 6.0, 7.0, 10.0, 8.0, 11.0, 9.0, 12.0], 0) | ||
""" | ||
java_transposed_matrix = self._java_matrix_wrapper.call("transpose") | ||
return BlockMatrix(java_transposed_matrix, self.colsPerBlock, self.rowsPerBlock) | ||
|
||
def toLocalMatrix(self): | ||
""" | ||
Collect the distributed matrix on the driver as a DenseMatrix. | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this blank line intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I like the visual clarity when viewing these tests on the Python docs, as it helps indicate that the following two tests rely on the data structures formed above. This is generally the pattern I've followed with these classes for cases with >1 test.