Skip to content

Commit b693209

Browse files
author
Burak Yavuz
committed
Ready for Pull request
1 parent 4afe9a4 commit b693209

File tree

2 files changed

+458
-0
lines changed

2 files changed

+458
-0
lines changed
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.linalg.distributed
19+
20+
import breeze.linalg.{DenseMatrix => BDM}
21+
22+
import org.apache.spark._
23+
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices }
24+
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.SparkContext._
26+
import org.apache.spark.storage.StorageLevel
27+
import org.apache.spark.util.Utils
28+
29+
case class BlockPartition(
30+
blockIdRow: Int,
31+
blockIdCol: Int,
32+
mat: DenseMatrix) extends Serializable
33+
34+
// Information about BlockMatrix maintained on the driver
35+
case class BlockPartitionInfo(
36+
partitionId: Int,
37+
blockIdRow: Int,
38+
blockIdCol: Int,
39+
startRow: Long,
40+
numRows: Int,
41+
startCol: Long,
42+
numCols: Int) extends Serializable
43+
44+
abstract class BlockMatrixPartitioner(
45+
override val numPartitions: Int,
46+
val rowPerBlock: Int,
47+
val colPerBlock: Int) extends Partitioner {
48+
val name: String
49+
50+
override def getPartition(key: Any): Int = {
51+
Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions)
52+
}
53+
}
54+
55+
class GridPartitioner(
56+
val numRowBlocks: Int,
57+
val numColBlocks: Int,
58+
override val rowPerBlock: Int,
59+
override val colPerBlock: Int)
60+
extends BlockMatrixPartitioner(numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) {
61+
62+
override val name = "grid"
63+
64+
override val numPartitions = numRowBlocks * numColBlocks
65+
66+
override def equals(obj: Any): Boolean = {
67+
obj match {
68+
case r: GridPartitioner =>
69+
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
70+
(this.colPerBlock == r.colPerBlock)
71+
case _ =>
72+
false
73+
}
74+
}
75+
}
76+
77+
class RowBasedPartitioner(
78+
override val numPartitions: Int,
79+
override val rowPerBlock: Int,
80+
override val colPerBlock: Int)
81+
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {
82+
83+
override val name = "row"
84+
85+
override def equals(obj: Any): Boolean = {
86+
obj match {
87+
case r: RowBasedPartitioner =>
88+
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
89+
(this.colPerBlock == r.colPerBlock)
90+
case _ =>
91+
false
92+
}
93+
}
94+
}
95+
96+
class ColumnBasedPartitioner(
97+
override val numPartitions: Int,
98+
override val rowPerBlock: Int,
99+
override val colPerBlock: Int)
100+
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {
101+
102+
override val name = "column"
103+
104+
override def equals(obj: Any): Boolean = {
105+
obj match {
106+
case p: ColumnBasedPartitioner =>
107+
(this.numPartitions == p.numPartitions) && (this.rowPerBlock == p.rowPerBlock) &&
108+
(this.colPerBlock == p.colPerBlock)
109+
case r: RowBasedPartitioner =>
110+
(this.numPartitions == r.numPartitions) && (this.colPerBlock == r.rowPerBlock)
111+
case _ =>
112+
false
113+
}
114+
}
115+
}
116+
117+
class BlockMatrix(
118+
val numRowBlocks: Int,
119+
val numColBlocks: Int,
120+
val rdd: RDD[BlockPartition],
121+
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {
122+
123+
// We need a key-value pair RDD to partition properly
124+
private var matrixRDD = rdd.map { block =>
125+
partitioner match {
126+
case r: RowBasedPartitioner => (block.blockIdRow, block)
127+
case c: ColumnBasedPartitioner => (block.blockIdCol, block)
128+
case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block)
129+
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
130+
}
131+
}
132+
133+
@transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null
134+
135+
lazy val dims: (Long, Long) = getDim
136+
137+
override def numRows(): Long = dims._1
138+
override def numCols(): Long = dims._2
139+
140+
if (partitioner.name.equals("column")) {
141+
require(numColBlocks == partitioner.numPartitions)
142+
} else if (partitioner.name.equals("row")) {
143+
require(numRowBlocks == partitioner.numPartitions)
144+
} else if (partitioner.name.equals("grid")) {
145+
require(numRowBlocks * numColBlocks == partitioner.numPartitions)
146+
} else {
147+
throw new IllegalArgumentException("Unrecognized partitioner.")
148+
}
149+
150+
def getDim: (Long, Long) = {
151+
val bi = getBlockInfo
152+
val xDim = bi.map { x =>
153+
(x._1._1, x._2.numRows.toLong)
154+
}.groupBy(x => x._1).values.map { x =>
155+
x.head._2.toLong
156+
}.reduceLeft {
157+
_ + _
158+
}
159+
160+
val yDim = bi.map { x =>
161+
(x._1._2, x._2.numCols.toLong)
162+
}.groupBy(x => x._1).values.map { x =>
163+
x.head._2.toLong
164+
}.reduceLeft {
165+
_ + _
166+
}
167+
168+
(xDim, yDim)
169+
}
170+
171+
private def calculateBlockInfo(): Unit = {
172+
173+
// collect may cause akka frameSize errors
174+
val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) =>
175+
iter.map { case (id, block) =>
176+
((block.blockIdRow, block.blockIdCol), (partId, block.mat.numRows, block.mat.numCols))
177+
}
178+
}.collect()
179+
val blockStartRowCols = blockStartRowColsParts.sortBy(_._1)
180+
181+
// Group blockInfo by rowId, pick the first row and sort on rowId
182+
val rowReps = blockStartRowCols.groupBy(_._1._1).values.map(_.head).toSeq.sortBy(_._1._1)
183+
184+
// Group blockInfo by columnId, pick the first column and sort on columnId
185+
val colReps = blockStartRowCols.groupBy(_._1._2).values.map(_.head).toSeq.sortBy(_._1._2)
186+
187+
// Calculate startRows
188+
val cumulativeRowSum = rowReps.scanLeft((0, 0L)) { case (x1, x2) =>
189+
(x1._1 + 1, x1._2 + x2._2._2)
190+
}.toMap
191+
192+
val cumulativeColSum = colReps.scanLeft((0, 0L)) { case (x1, x2) =>
193+
(x1._1 + 1, x1._2 + x2._2._3)
194+
}.toMap
195+
196+
blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
197+
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), numRow,
198+
cumulativeColSum(colId), numCol))
199+
}.toMap
200+
}
201+
202+
def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = {
203+
if (blockInfo_ == null) {
204+
calculateBlockInfo()
205+
}
206+
blockInfo_
207+
}
208+
209+
def normFro(): Double = {
210+
math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
211+
}
212+
213+
/* Cache the underlying RDD. */
214+
def cache(): DistributedMatrix = {
215+
matrixRDD.cache()
216+
this
217+
}
218+
219+
/* Set the storage level for the underlying RDD. */
220+
def persist(storageLevel: StorageLevel): DistributedMatrix = {
221+
matrixRDD.persist(storageLevel)
222+
this
223+
}
224+
225+
def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = {
226+
matrixRDD = matrixRDD.partitionBy(part)
227+
this
228+
}
229+
230+
/* Collect the distributed matrix on the driver. */
231+
def collect(): DenseMatrix = {
232+
val parts = rdd.map(x => ((x.blockIdRow, x.blockIdCol), x.mat)).
233+
collect().sortBy(x => (x._1._2, x._1._1))
234+
val nRows = numRows().toInt
235+
val nCols = numCols().toInt
236+
val values = new Array[Double](nRows * nCols)
237+
val blockInfos = getBlockInfo
238+
parts.foreach { part =>
239+
val blockInfo = blockInfos((part._1._1, part._1._2))
240+
// Figure out where this part should be put
241+
var j = 0
242+
while (j < blockInfo.numCols) {
243+
var i = 0
244+
val indStart = (j + blockInfo.startCol.toInt) * nRows + blockInfo.startRow.toInt
245+
val indEnd = blockInfo.numRows
246+
val matStart = j * blockInfo.numRows
247+
val mat = part._2.values
248+
while (i < indEnd) {
249+
values(indStart + i) = mat(matStart + i)
250+
i += 1
251+
}
252+
j += 1
253+
}
254+
}
255+
new DenseMatrix(nRows, nCols, values)
256+
}
257+
258+
private[mllib] def toBreeze(): BDM[Double] = {
259+
val localMat = collect()
260+
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
261+
}
262+
263+
def add(other: DistributedMatrix): DistributedMatrix = {
264+
other match {
265+
// We really need a function to check if two matrices are partitioned similarly
266+
case otherBlocked: BlockMatrix =>
267+
if (checkPartitioning(otherBlocked, OperationNames.add)){
268+
val addedBlocks = rdd.zip(otherBlocked.rdd).map{ case (a, b) =>
269+
val result = a.mat.toBreeze + b.mat.toBreeze
270+
new BlockPartition(a.blockIdRow, a.blockIdCol,
271+
Matrices.fromBreeze(result).asInstanceOf[DenseMatrix])
272+
}
273+
new BlockMatrix(numRowBlocks, numColBlocks, addedBlocks, partitioner)
274+
} else {
275+
throw new SparkException(
276+
"Cannot add matrices with non-matching partitioners")
277+
}
278+
case _ =>
279+
throw new IllegalArgumentException("Cannot add matrices of different types")
280+
}
281+
}
282+
283+
def multiply(other: DistributedMatrix): BlockMatrix = {
284+
other match {
285+
case otherBlocked: BlockMatrix =>
286+
if (checkPartitioning(otherBlocked, OperationNames.multiply)){
287+
288+
val resultPartitioner = new GridPartitioner(numRowBlocks, otherBlocked.numColBlocks,
289+
partitioner.rowPerBlock, otherBlocked.partitioner.colPerBlock)
290+
291+
val multiplyBlocks = matrixRDD.join(otherBlocked.matrixRDD, partitioner).
292+
map { case (key, (mat1, mat2)) =>
293+
val C = mat1.mat multiply mat2.mat
294+
(mat1.blockIdRow + numRowBlocks * mat2.blockIdCol, C.toBreeze)
295+
}.reduceByKey(resultPartitioner, (a, b) => a + b)
296+
297+
val newBlocks = multiplyBlocks.map{ case (index, mat) =>
298+
val colId = index / numRowBlocks
299+
val rowId = index - colId * numRowBlocks
300+
new BlockPartition(rowId, colId, Matrices.fromBreeze(mat).asInstanceOf[DenseMatrix])
301+
}
302+
new BlockMatrix(numRowBlocks, otherBlocked.numColBlocks, newBlocks, resultPartitioner)
303+
} else {
304+
throw new SparkException(
305+
"Cannot multiply matrices with non-matching partitioners")
306+
}
307+
case _ =>
308+
throw new IllegalArgumentException("Cannot add matrices of different types")
309+
}
310+
}
311+
312+
private def checkPartitioning(other: BlockMatrix, operation: Int): Boolean = {
313+
val otherPartitioner = other.partitioner
314+
operation match {
315+
case OperationNames.add =>
316+
partitioner.equals(otherPartitioner)
317+
case OperationNames.multiply =>
318+
partitioner.name == "column" && otherPartitioner.name == "row" &&
319+
partitioner.numPartitions == otherPartitioner.numPartitions &&
320+
partitioner.colPerBlock == otherPartitioner.rowPerBlock &&
321+
numColBlocks == other.numRowBlocks
322+
case _ =>
323+
throw new IllegalArgumentException("Unsupported operation")
324+
}
325+
}
326+
}
327+
328+
/**
329+
* Maintains supported and default block matrix operation names.
330+
*
331+
* Currently supported operations: `add`, `multiply`.
332+
*/
333+
private object OperationNames {
334+
335+
val add: Int = 1
336+
val multiply: Int = 2
337+
338+
}

0 commit comments

Comments
 (0)