Skip to content

Commit ab6cde0

Browse files
committed
[SPARK-3974] Modifications cleaning code up, making size calculation more robust
1 parent 9ae85aa commit ab6cde0

File tree

2 files changed

+78
-215
lines changed

2 files changed

+78
-215
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala

Lines changed: 66 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -20,97 +20,51 @@ package org.apache.spark.mllib.linalg.distributed
2020
import breeze.linalg.{DenseMatrix => BDM}
2121

2222
import org.apache.spark._
23-
import org.apache.spark.mllib.linalg.DenseMatrix
23+
import org.apache.spark.mllib.linalg._
2424
import org.apache.spark.mllib.rdd.RDDFunctions._
2525
import org.apache.spark.rdd.RDD
2626
import org.apache.spark.storage.StorageLevel
2727
import org.apache.spark.util.Utils
2828

2929
/**
30-
* Represents a local matrix that makes up one block of a distributed BlockMatrix
31-
*
32-
* @param blockRowIndex The row index of this block. Must be zero based.
33-
* @param blockColIndex The column index of this block. Must be zero based.
34-
* @param mat The underlying local matrix
35-
*/
36-
case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable
37-
38-
/**
39-
* A partitioner that decides how the matrix is distributed in the cluster
30+
* A grid partitioner, which stores every block in a separate partition.
4031
*
41-
* @param numPartitions Number of partitions
32+
* @param numRowBlocks Number of blocks that form the rows of the matrix.
33+
* @param numColBlocks Number of blocks that form the columns of the matrix.
4234
* @param rowPerBlock Number of rows that make up each block.
4335
* @param colPerBlock Number of columns that make up each block.
4436
*/
45-
private[mllib] abstract class BlockMatrixPartitioner(
46-
override val numPartitions: Int,
37+
private[mllib] class GridPartitioner(
38+
val numRowBlocks: Int,
39+
val numColBlocks: Int,
4740
val rowPerBlock: Int,
48-
val colPerBlock: Int) extends Partitioner {
49-
val name: String
41+
val colPerBlock: Int,
42+
override val numPartitions: Int) extends Partitioner {
5043

5144
/**
5245
* Returns the index of the partition the SubMatrix belongs to.
5346
*
54-
* @param key The key for the SubMatrix. Can be its row index, column index or position in the
55-
* grid.
47+
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
48+
* or a tuple of three integers that are the final row index after the multiplication,
49+
* the index of the block to multiply with, and the final column index after the
50+
* multiplication.
5651
* @return The index of the partition, which the SubMatrix belongs to.
5752
*/
5853
override def getPartition(key: Any): Int = {
59-
Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions)
60-
}
61-
}
62-
63-
/**
64-
* A grid partitioner, which stores every block in a separate partition.
65-
*
66-
* @param numRowBlocks Number of blocks that form the rows of the matrix.
67-
* @param numColBlocks Number of blocks that form the columns of the matrix.
68-
* @param rowPerBlock Number of rows that make up each block.
69-
* @param colPerBlock Number of columns that make up each block.
70-
*/
71-
class GridPartitioner(
72-
val numRowBlocks: Int,
73-
val numColBlocks: Int,
74-
override val rowPerBlock: Int,
75-
override val colPerBlock: Int)
76-
extends BlockMatrixPartitioner(numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) {
77-
78-
override val name = "grid"
79-
80-
override val numPartitions = numRowBlocks * numColBlocks
81-
82-
/** Checks whether the partitioners have the same characteristics */
83-
override def equals(obj: Any): Boolean = {
84-
obj match {
85-
case r: GridPartitioner =>
86-
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
87-
(this.colPerBlock == r.colPerBlock)
54+
key match {
55+
case ind: (Int, Int) =>
56+
Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions)
57+
case indices: (Int, Int, Int) =>
58+
Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions)
8859
case _ =>
89-
false
60+
throw new IllegalArgumentException("Unrecognized key")
9061
}
9162
}
92-
}
93-
94-
/**
95-
* A specialized partitioner that stores all blocks in the same row in just one partition.
96-
*
97-
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
98-
* the rows of the matrix.
99-
* @param rowPerBlock Number of rows that make up each block.
100-
* @param colPerBlock Number of columns that make up each block.
101-
*/
102-
class RowBasedPartitioner(
103-
override val numPartitions: Int,
104-
override val rowPerBlock: Int,
105-
override val colPerBlock: Int)
106-
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {
107-
108-
override val name = "row"
10963

11064
/** Checks whether the partitioners have the same characteristics */
11165
override def equals(obj: Any): Boolean = {
11266
obj match {
113-
case r: RowBasedPartitioner =>
67+
case r: GridPartitioner =>
11468
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
11569
(this.colPerBlock == r.colPerBlock)
11670
case _ =>
@@ -119,36 +73,6 @@ class RowBasedPartitioner(
11973
}
12074
}
12175

122-
/**
123-
* A specialized partitioner that stores all blocks in the same column in just one partition.
124-
*
125-
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
126-
* the columns of the matrix.
127-
* @param rowPerBlock Number of rows that make up each block.
128-
* @param colPerBlock Number of columns that make up each block.
129-
*/
130-
class ColumnBasedPartitioner(
131-
override val numPartitions: Int,
132-
override val rowPerBlock: Int,
133-
override val colPerBlock: Int)
134-
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {
135-
136-
override val name = "column"
137-
138-
/** Checks whether the partitioners have the same characteristics */
139-
override def equals(obj: Any): Boolean = {
140-
obj match {
141-
case p: ColumnBasedPartitioner =>
142-
(this.numPartitions == p.numPartitions) && (this.rowPerBlock == p.rowPerBlock) &&
143-
(this.colPerBlock == p.colPerBlock)
144-
case r: RowBasedPartitioner =>
145-
(this.numPartitions == r.numPartitions) && (this.colPerBlock == r.rowPerBlock)
146-
case _ =>
147-
false
148-
}
149-
}
150-
}
151-
15276
/**
15377
* Represents a distributed matrix in blocks of local matrices.
15478
*
@@ -159,7 +83,9 @@ class ColumnBasedPartitioner(
15983
class BlockMatrix(
16084
val numRowBlocks: Int,
16185
val numColBlocks: Int,
162-
val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging {
86+
val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging {
87+
88+
type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
16389

16490
/**
16591
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
@@ -168,125 +94,92 @@ class BlockMatrix(
16894
* @param numRowBlocks Number of blocks that form the rows of this matrix
16995
* @param numColBlocks Number of blocks that form the columns of this matrix
17096
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
171-
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
97+
* @param rowPerBlock Number of rows that make up each block.
98+
* @param colPerBlock Number of columns that make up each block.
17299
*/
173100
def this(
174101
numRowBlocks: Int,
175102
numColBlocks: Int,
176-
rdd: RDD[SubMatrix],
177-
partitioner: BlockMatrixPartitioner) = {
103+
rdd: RDD[((Int, Int), Matrix)],
104+
rowPerBlock: Int,
105+
colPerBlock: Int) = {
178106
this(numRowBlocks, numColBlocks, rdd)
179-
setPartitioner(partitioner)
107+
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length)
108+
setPartitioner(part)
180109
}
181110

182-
private[mllib] var partitioner: BlockMatrixPartitioner = {
183-
val firstSubMatrix = rdd.first().mat
111+
private[mllib] var partitioner: GridPartitioner = {
112+
val firstSubMatrix = rdd.first()._2
184113
new GridPartitioner(numRowBlocks, numColBlocks,
185-
firstSubMatrix.numRows, firstSubMatrix.numCols)
114+
firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length)
186115
}
187116

188117
/**
189118
* Set the partitioner for the matrix. For internal use only. Users should use `repartition`.
190119
* @param part A partitioner that specifies how SubMatrices are stored in the cluster
191120
*/
192-
private def setPartitioner(part: BlockMatrixPartitioner): Unit = {
121+
private def setPartitioner(part: GridPartitioner): Unit = {
193122
partitioner = part
194123
}
195124

196-
// A key-value pair RDD is required to partition properly
197-
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()
198-
199125
private lazy val dims: (Long, Long) = getDim
200126

201127
override def numRows(): Long = dims._1
202128
override def numCols(): Long = dims._2
203129

204-
if (partitioner.name.equals("column")) {
205-
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
206-
s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " +
207-
s"partitioner.numPartitions: ${partitioner.numPartitions}")
208-
} else if (partitioner.name.equals("row")) {
209-
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
210-
s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " +
211-
s"partitioner.numPartitions: ${partitioner.numPartitions}")
212-
} else if (partitioner.name.equals("grid")) {
213-
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
214-
s"should match the number of partitions of the grid partitioner. numRowBlocks * " +
215-
s"numColBlocks: ${numRowBlocks * numColBlocks}, " +
216-
s"partitioner.numPartitions: ${partitioner.numPartitions}")
217-
} else {
218-
throw new IllegalArgumentException("Unrecognized partitioner.")
219-
}
220-
221130
/** Returns the dimensions of the matrix. */
222131
def getDim: (Long, Long) = {
223-
224-
val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0).
225-
map { block =>
226-
((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
132+
// picks the sizes of the matrix with the maximum indices
133+
def pickSizeByGreaterIndex(example: (Int, Int, Int, Int), base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = {
134+
if (example._1 > base._1 && example._2 > base._2) {
135+
(example._1, example._2, example._3, example._4)
136+
} else if (example._1 > base._1) {
137+
(example._1, base._2, example._3, base._4)
138+
} else if (example._2 > base._2) {
139+
(base._1, example._2, base._3, example._4)
140+
} else {
141+
(base._1, base._2, base._3, base._4)
227142
}
143+
}
228144

229-
firstRowColumn.treeAggregate((0L, 0L))(
230-
seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
231-
if (indX == 0 && indY == 0) {
232-
(x_dim + nRow, y_dim + nCol)
233-
} else if (indX == 0) {
234-
(x_dim, y_dim + nCol)
235-
} else {
236-
(x_dim + nRow, y_dim)
237-
}
145+
val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))(
146+
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
147+
pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base)
238148
},
239149
combOp = (c1, c2) => (c1, c2) match {
240-
case ((x_dim1, y_dim1), (x_dim2, y_dim2)) =>
241-
(x_dim1 + x_dim2, y_dim1 + y_dim2)
150+
case (res1, res2) =>
151+
pickSizeByGreaterIndex(res1, res2)
242152
})
153+
154+
(lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3,
155+
lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4)
243156
}
244157

245158
/** Returns the Frobenius Norm of the matrix */
246159
def normFro(): Double = {
247-
math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
160+
math.sqrt(rdd.map {
161+
case sparse: ((Int, Int), SparseMatrix) =>
162+
sparse._2.values.map(x => math.pow(x, 2)).sum
163+
case dense: ((Int, Int), DenseMatrix) =>
164+
dense._2.values.map(x => math.pow(x, 2)).sum
165+
}.reduce(_ + _))
248166
}
249167

250168
/** Cache the underlying RDD. */
251169
def cache(): DistributedMatrix = {
252-
matrixRDD.cache()
170+
rdd.cache()
253171
this
254172
}
255173

256174
/** Set the storage level for the underlying RDD. */
257175
def persist(storageLevel: StorageLevel): DistributedMatrix = {
258-
matrixRDD.persist(storageLevel)
259-
this
260-
}
261-
262-
/** Add a key to the underlying rdd for partitioning and joins. */
263-
private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = {
264-
rdd.map { block =>
265-
part match {
266-
case r: RowBasedPartitioner => (block.blockRowIndex, block)
267-
case c: ColumnBasedPartitioner => (block.blockColIndex, block)
268-
case g: GridPartitioner => (block.blockRowIndex + numRowBlocks * block.blockColIndex, block)
269-
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
270-
}
271-
}
272-
}
273-
274-
/**
275-
* Repartition the BlockMatrix using a different partitioner.
276-
*
277-
* @param part The partitioner to partition by
278-
* @return The repartitioned BlockMatrix
279-
*/
280-
def repartition(part: BlockMatrixPartitioner): DistributedMatrix = {
281-
matrixRDD = keyBy(part)
282-
setPartitioner(part)
176+
rdd.persist(storageLevel)
283177
this
284178
}
285179

286180
/** Collect the distributed matrix on the driver. */
287-
def collect(): DenseMatrix = {
288-
val parts = rdd.map(x => ((x.blockRowIndex, x.blockColIndex), x.mat)).
289-
collect().sortBy(x => (x._1._2, x._1._1))
181+
def toLocalMatrix(): Matrix = {
182+
val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
290183
val nRows = numRows().toInt
291184
val nCols = numCols().toInt
292185
val values = new Array[Double](nRows * nCols)
@@ -301,7 +194,7 @@ class BlockMatrix(
301194
val indStart = (j + colStart) * nRows + rowStart
302195
val indEnd = block.numRows
303196
val matStart = j * block.numRows
304-
val mat = block.values
197+
val mat = block.toArray
305198
while (i < indEnd) {
306199
values(indStart + i) = mat(matStart + i)
307200
i += 1
@@ -316,7 +209,7 @@ class BlockMatrix(
316209

317210
/** Collects data and assembles a local dense breeze matrix (for test only). */
318211
private[mllib] def toBreeze(): BDM[Double] = {
319-
val localMat = collect()
320-
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
212+
val localMat = toLocalMatrix()
213+
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
321214
}
322215
}

0 commit comments

Comments
 (0)