@@ -10,95 +10,51 @@ import org.apache.spark.ml.image.ImageSchema._
1010import java .io .PrintWriter
1111import _root_ .Utils .FileUtils
1212import java .nio .charset .Charset
13-
14- object SparkJob extends Job {
15- var inputPathImage = " file:///C://data/img_noisy1.png"
16- var outputPathImage = " file:///C://data/out.png"
17- var outputPathJson = " file:///C://data/report.json"
18-
19- var padding = 10
20- var subHeight = 300
21- var subWidth = 300
22- var denoiserRuns = 100
23-
24- var debug = 1
25-
26- val usage = " \n Usage: [--sub_matrix_size] [--padding] [--denoiser_runs] [--debug] [--output_file_json] [--output_file_image] input_file_image\n "
27- def main (args : Array [String ]): Unit = {
28- // Check arguments
29- if (args.length % 2 == 0 ) {
30- println(usage)
31- return
32- }
33- args.sliding(2 , 2 ).toList.collect {
34- case Array (" --sub_matrix_size" , m_size : String ) => {
35- subHeight = m_size.toInt
36- subWidth = m_size.toInt
37- }
38- case Array (" --padding" , p : String ) => padding = p.toInt
39- case Array (" --denoiser_runs" , runs : String ) => denoiserRuns = runs.toInt
40- case Array (" --debug" , d : String ) => debug = d.toInt
41- case Array (" --output_file_json" , out : String ) => outputPathJson = out
42- case Array (" --output_file_image" , out : String ) => outputPathImage = out
43- case Array (out : String ) => inputPathImage = out
44- }
45-
46- println(s " \n Input file: ${inputPathImage}" )
47- println(s " Output file: ${outputPathImage}" )
48- println(s " Output json: ${outputPathJson}" )
49- println(s " Sub matrix size: ${subHeight}" )
50- println(s " Paddding: ${padding}" )
51-
52- println(" \n Start" )
53- val t = Utils .time(run)
54- if (debug > 0 )
55- println(s " Time: $t ms " )
56-
57- val json = " {\" time\" :" + t + " }"
58- val os = FileUtils .getOutputStream(outputPathJson)
59- os.write(json.getBytes(Charset .forName(" UTF-8" )))
60- os.close()
61-
62- }
63-
64- def run (): Unit = {
13+ import java .io .InputStream
14+ import java .io .OutputStream
15+ import Pipelines .Pipeline
16+ import scala .collection .parallel .immutable .ParSeq
17+ import org .apache .spark .HashPartitioner
18+ import org .apache .spark .storage .StorageLevel
19+
20+ class SparkJob (val padding : Int = 3 ,
21+ val subHeight : Int = 100 ,
22+ val subWidth : Int = 100 ,
23+ val denoiserRuns : Int = 80 ,
24+ val debug : Int = 1 ) extends Serializable {
25+
26+ /**
27+ * Runs the Job on the spark cluster given a Pipeline
28+ */
29+ def run (inputMatrix : BDM [Double ], pipeline : Pipeline ): BDM [Double ] = {
30+ // Spark setup
6531 val conf = new SparkConf ().setAppName(" GibbsDenoiser" )
6632 // .setMaster("local[*]")
6733 conf.set(" spark.serializer" , " org.apache.spark.serializer.KryoSerializer" )
68- conf.registerKryoClasses(Array (classOf [Tuple2 [Tuple2 [Int , Int ], Matrix ]]))
69-
34+
7035 val sc = new SparkContext (conf)
7136
72- val inputImage = new Image ()
73- val is = FileUtils .getInputStream(inputPathImage)
74- val pixelArray = inputImage.getPixelMatrix(is, true )
75- is.close()
76- val pixelMatrix = new BDM [Double ](inputImage.width, inputImage.height, pixelArray.map(_.toDouble))
37+ // Split big matrix into submatrixes, according to subHeight and subWidth
38+ val splitted = splitImage(inputMatrix)
7739
78- val splitted = splitImage(pixelMatrix)
79-
80- var n = (pixelMatrix.cols) / subWidth // cols divisions
81- var m = (pixelMatrix.rows) / subHeight // rows divisions
82-
83- val mat = sc.parallelize(splitted, n * m * 100 )
84- val computed = compute(mat, processPipelne)
40+ // Make RDD of matrixes
41+ // val matrixes = sc.parallelize(splitted._1, splitted._2 * splitted._3 * 100)
42+ val matrixes = sc.parallelize(splitted._1)
43+ matrixes.partitionBy(new HashPartitioner (splitted._2 * splitted._3)).persist(StorageLevel .MEMORY_ONLY )
44+ val computed = compute(matrixes, pipeline)
8545
46+ // Reassemble the matrix
8647 val blockMat = new BlockMatrix (computed, subHeight, subWidth)
8748 val out = Utils .matrixAsBreeze(blockMat.toLocalMatrix())
88- val cleaned = out(0 to pixelMatrix.rows - 1 , 0 to pixelMatrix.cols - 1 ).copy
89- println(" It's all ok" )
9049
91- val os = FileUtils .getOutputStream(outputPathImage)
92- val outputImage = new Image ()
93- outputImage.setPixelMatrix(cleaned.data.map(_.toInt), cleaned.rows, cleaned.cols, true )
94- outputImage.saveImage(os)
95- os.close()
50+ // Remove padding border
51+ out(0 to inputMatrix.rows - 1 , 0 to inputMatrix.cols - 1 ).copy
9652
9753 // edges.partitionBy(new RangePartitioner(SparkContextSingleton.DEFAULT_PARALLELISM, edges)).persist(StorageLevel.MEMORY_AND_DISK)
9854 }
9955
10056
101- private def splitImage (pixelMatrix : BDM [Double ]): Seq [((Int , Int ), Matrix )] = {
57+ private def splitImage (pixelMatrix : BDM [Double ]): ( Seq [((Int , Int ), Matrix )], Int , Int ) = {
10258 val subHeight = if (this .subHeight <= 0 ) pixelMatrix.rows else this .subHeight
10359 val subWidth = if (this .subWidth <= 0 ) pixelMatrix.cols else this .subWidth
10460 assert(padding <= subHeight)
@@ -131,27 +87,28 @@ object SparkJob extends Job {
13187 println(" x sub-matrix: " + n)
13288 println(" y sub-matrix: " + m)
13389 }
134- for {
135- p1 <- 0 until n // X
136- p2 <- 0 until m // Y
90+ ( for {
91+ p1 <- ( 0 until n) // X
92+ p2 <- ( 0 until m) // Y
13793 } yield {
13894 val xFromPadded = p1 * subWidth
13995 val xToPadded = xFromPadded + subWidth + padding* 2 - 1
14096 val yFromPadded = p2 * subHeight
14197 val yToPadded = yFromPadded + subHeight + padding* 2 - 1
14298 val matrix = paddedMatrix(yFromPadded to yToPadded, xFromPadded to xToPadded).copy
14399 ((p2, p1), Utils .matrixFromBreeze(matrix))
144- }
100+ }, n, m)
145101 }
146102
147- private def compute (matrixes : RDD [((Int , Int ), Matrix )], transform : (BDM [Double ]) => (BDM [Double ])): RDD [((Int , Int ), Matrix )] = {
103+ private def compute (matrixes : RDD [((Int , Int ), Matrix )], pipeline : Pipeline ): RDD [((Int , Int ), Matrix )] = {
104+ // mapPartitions??
148105 matrixes.map ( element => {
149106 val matrix = Utils .matrixAsBreeze(element._2)
150- val out = transform( matrix)
107+ val out = removePadding(pipeline.run( matrix) )
151108 (element._1, Utils .matrixFromBreeze(out))
152109 })
153110 }
154111
155- }
156- // sbt "runMain SparkJob ./data/nike_noisy.png"
157- // spark-submit --class SparkJob ./jar/binary.jar ./data/nike_noisy.png
112+ def removePadding ( matrix : BDM [ Double ]) : BDM [ Double ] =
113+ matrix(padding to - padding, padding to - padding).copy
114+ }
0 commit comments