Skip to content

Commit fa408a8

Browse files
committed
ssMerge remote-tracking branch 'upstream/master' into ldaonline
2 parents 45884ab + 56aff4b commit fa408a8

File tree

14 files changed

+186
-36
lines changed

14 files changed

+186
-36
lines changed

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,22 @@ private[spark] object JettyUtils extends Logging {
6262
securityMgr: SecurityManager): HttpServlet = {
6363
new HttpServlet {
6464
override def doGet(request: HttpServletRequest, response: HttpServletResponse) {
65-
if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
66-
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
67-
response.setStatus(HttpServletResponse.SC_OK)
68-
val result = servletParams.responder(request)
69-
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
70-
response.getWriter.println(servletParams.extractFn(result))
71-
} else {
72-
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
73-
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
74-
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
75-
"User is not authorized to access this page.")
65+
try {
66+
if (securityMgr.checkUIViewPermissions(request.getRemoteUser)) {
67+
response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
68+
response.setStatus(HttpServletResponse.SC_OK)
69+
val result = servletParams.responder(request)
70+
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
71+
response.getWriter.println(servletParams.extractFn(result))
72+
} else {
73+
response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
74+
response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
75+
response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
76+
"User is not authorized to access this page.")
77+
}
78+
} catch {
79+
case e: IllegalArgumentException =>
80+
response.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage)
7681
}
7782
}
7883
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorThreadDumpPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
4343
}
4444
id
4545
}.getOrElse {
46-
return Text(s"Missing executorId parameter")
46+
throw new IllegalArgumentException(s"Missing executorId parameter")
4747
}
4848
val time = System.currentTimeMillis()
4949
val maybeThreadDump = sc.get.getExecutorThreadDump(executorId)

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
3232

3333
def render(request: HttpServletRequest): Seq[Node] = {
3434
listener.synchronized {
35-
val jobId = request.getParameter("id").toInt
35+
val parameterId = request.getParameter("id")
36+
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
37+
38+
val jobId = parameterId.toInt
3639
val jobDataOption = listener.jobIdToData.get(jobId)
3740
if (jobDataOption.isEmpty) {
3841
val content =

core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
3232
def render(request: HttpServletRequest): Seq[Node] = {
3333
listener.synchronized {
3434
val poolName = request.getParameter("poolname")
35+
require(poolName != null && poolName.nonEmpty, "Missing poolname parameter")
36+
3537
val poolToActiveStages = listener.poolToActiveStages
3638
val activeStages = poolToActiveStages.get(poolName) match {
3739
case Some(s) => s.values.toSeq

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
3636

3737
def render(request: HttpServletRequest): Seq[Node] = {
3838
listener.synchronized {
39-
val stageId = request.getParameter("id").toInt
40-
val stageAttemptId = request.getParameter("attempt").toInt
39+
val parameterId = request.getParameter("id")
40+
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
41+
42+
val parameterAttempt = request.getParameter("attempt")
43+
require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter")
44+
45+
val stageId = parameterId.toInt
46+
val stageAttemptId = parameterAttempt.toInt
4147
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
4248

4349
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {

core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
3030
private val listener = parent.listener
3131

3232
def render(request: HttpServletRequest): Seq[Node] = {
33-
val rddId = request.getParameter("id").toInt
33+
val parameterId = request.getParameter("id")
34+
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
35+
36+
val rddId = parameterId.toInt
3437
val storageStatusList = listener.storageStatusList
3538
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
3639
// Rather than crashing, render an "RDD Not Found" page

docs/mllib-guide.md

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,32 @@ See the **[spark.ml programming guide](ml-guide.html)** for more information on
5656

5757
# Dependencies
5858

59-
MLlib uses the linear algebra package [Breeze](http://www.scalanlp.org/),
60-
which depends on [netlib-java](https://github.com/fommil/netlib-java),
61-
and [jblas](https://github.com/mikiobraun/jblas).
62-
`netlib-java` and `jblas` depend on native Fortran routines.
63-
You need to install the
59+
MLlib uses the linear algebra package
60+
[Breeze](http://www.scalanlp.org/), which depends on
61+
[netlib-java](https://github.com/fommil/netlib-java) for optimised
62+
numerical processing. If natives are not available at runtime, you
63+
will see a warning message and a pure JVM implementation will be used
64+
instead.
65+
66+
To learn more about the benefits and background of system optimised
67+
natives, you may wish to watch Sam Halliday's ScalaX talk on
68+
[High Performance Linear Algebra in Scala](http://fommil.github.io/scalax14/#/)).
69+
70+
Due to licensing issues with runtime proprietary binaries, we do not
71+
include `netlib-java`'s native proxies by default. To configure
72+
`netlib-java` / Breeze to use system optimised binaries, include
73+
`com.github.fommil.netlib:all:1.1.2` (or build Spark with
74+
`-Pnetlib-lgpl`) as a dependency of your project and read the
75+
[netlib-java](https://github.com/fommil/netlib-java) documentation for
76+
your platform's additional installation instructions.
77+
78+
MLlib also uses [jblas](https://github.com/mikiobraun/jblas) which
79+
will require you to install the
6480
[gfortran runtime library](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)
6581
if it is not already present on your nodes.
66-
MLlib will throw a linking error if it cannot detect these libraries automatically.
67-
Due to license issues, we do not include `netlib-java`'s native libraries in MLlib's
68-
dependency set under default settings.
69-
If no native library is available at runtime, you will see a warning message.
70-
To use native libraries from `netlib-java`, please build Spark with `-Pnetlib-lgpl` or
71-
include `com.github.fommil.netlib:all:1.1.2` as a dependency of your project.
72-
If you want to use optimized BLAS/LAPACK libraries such as
73-
[OpenBLAS](http://www.openblas.net/), please link its shared libraries to
74-
`/usr/lib/libblas.so.3` and `/usr/lib/liblapack.so.3`, respectively.
75-
BLAS/LAPACK libraries on worker nodes should be built without multithreading.
76-
77-
To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 1.4 or newer.
82+
83+
To use MLlib in Python, you will need [NumPy](http://www.numpy.org)
84+
version 1.4 or newer.
7885

7986
---
8087

ec2/spark_ec2.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import hashlib
2525
import logging
2626
import os
27+
import os.path
2728
import pipes
2829
import random
2930
import shutil
3031
import string
32+
from stat import S_IRUSR
3133
import subprocess
3234
import sys
3335
import tarfile
@@ -349,6 +351,7 @@ def launch_cluster(conn, opts, cluster_name):
349351
if opts.identity_file is None:
350352
print >> stderr, "ERROR: Must provide an identity file (-i) for ssh connections."
351353
sys.exit(1)
354+
352355
if opts.key_pair is None:
353356
print >> stderr, "ERROR: Must provide a key pair name (-k) to use on instances."
354357
sys.exit(1)
@@ -1007,6 +1010,18 @@ def real_main():
10071010
DeprecationWarning
10081011
)
10091012

1013+
if opts.identity_file is not None:
1014+
if not os.path.exists(opts.identity_file):
1015+
print >> stderr,\
1016+
"ERROR: The identity file '{f}' doesn't exist.".format(f=opts.identity_file)
1017+
sys.exit(1)
1018+
1019+
file_mode = os.stat(opts.identity_file).st_mode
1020+
if not (file_mode & S_IRUSR) or not oct(file_mode)[-2:] == '00':
1021+
print >> stderr, "ERROR: The identity file must be accessible only by you."
1022+
print >> stderr, 'You can fix this with: chmod 400 "{f}"'.format(f=opts.identity_file)
1023+
sys.exit(1)
1024+
10101025
if opts.ebs_vol_num > 8:
10111026
print >> stderr, "ebs-vol-num cannot be greater than 8"
10121027
sys.exit(1)

mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ private[mllib] object EigenValueDecomposition {
7979
// Mode 1: A*x = lambda*x, A symmetric
8080
iparam(6) = 1
8181

82+
require(n * ncv.toLong <= Integer.MAX_VALUE && ncv * (ncv.toLong + 8) <= Integer.MAX_VALUE,
83+
s"k = $k and/or n = $n are too large to compute an eigendecomposition")
84+
8285
var ido = new intW(0)
8386
var info = new intW(0)
8487
var resid = new Array[Double](n)

mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.mllib.recommendation
1919

2020
import org.apache.spark.Logging
21-
import org.apache.spark.annotation.{DeveloperApi, Experimental}
21+
import org.apache.spark.annotation.DeveloperApi
2222
import org.apache.spark.api.java.JavaRDD
2323
import org.apache.spark.ml.recommendation.{ALS => NewALS}
2424
import org.apache.spark.rdd.RDD

mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717

1818
package org.apache.spark.mllib.recommendation
1919

20+
import java.io.IOException
2021
import java.lang.{Integer => JavaInteger}
2122

23+
import org.apache.hadoop.fs.Path
2224
import org.jblas.DoubleMatrix
2325

24-
import org.apache.spark.Logging
26+
import org.apache.spark.{Logging, SparkContext}
2527
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
28+
import org.apache.spark.mllib.util.{Loader, Saveable}
2629
import org.apache.spark.rdd.RDD
30+
import org.apache.spark.sql.{Row, SQLContext}
2731
import org.apache.spark.storage.StorageLevel
2832

2933
/**
@@ -41,7 +45,8 @@ import org.apache.spark.storage.StorageLevel
4145
class MatrixFactorizationModel(
4246
val rank: Int,
4347
val userFeatures: RDD[(Int, Array[Double])],
44-
val productFeatures: RDD[(Int, Array[Double])]) extends Serializable with Logging {
48+
val productFeatures: RDD[(Int, Array[Double])])
49+
extends Saveable with Serializable with Logging {
4550

4651
require(rank > 0)
4752
validateFeatures("User", userFeatures)
@@ -125,6 +130,12 @@ class MatrixFactorizationModel(
125130
recommend(productFeatures.lookup(product).head, userFeatures, num)
126131
.map(t => Rating(t._1, product, t._2))
127132

133+
protected override val formatVersion: String = "1.0"
134+
135+
override def save(sc: SparkContext, path: String): Unit = {
136+
MatrixFactorizationModel.SaveLoadV1_0.save(this, path)
137+
}
138+
128139
private def recommend(
129140
recommendToFeatures: Array[Double],
130141
recommendableFeatures: RDD[(Int, Array[Double])],
@@ -136,3 +147,70 @@ class MatrixFactorizationModel(
136147
scored.top(num)(Ordering.by(_._2))
137148
}
138149
}
150+
151+
object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
152+
153+
import org.apache.spark.mllib.util.Loader._
154+
155+
override def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
156+
val (loadedClassName, formatVersion, metadata) = loadMetadata(sc, path)
157+
val classNameV1_0 = SaveLoadV1_0.thisClassName
158+
(loadedClassName, formatVersion) match {
159+
case (className, "1.0") if className == classNameV1_0 =>
160+
SaveLoadV1_0.load(sc, path)
161+
case _ =>
162+
throw new IOException("MatrixFactorizationModel.load did not recognize model with" +
163+
s"(class: $loadedClassName, version: $formatVersion). Supported:\n" +
164+
s" ($classNameV1_0, 1.0)")
165+
}
166+
}
167+
168+
private[recommendation]
169+
object SaveLoadV1_0 {
170+
171+
private val thisFormatVersion = "1.0"
172+
173+
private[recommendation]
174+
val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel"
175+
176+
/**
177+
* Saves a [[MatrixFactorizationModel]], where user features are saved under `data/users` and
178+
* product features are saved under `data/products`.
179+
*/
180+
def save(model: MatrixFactorizationModel, path: String): Unit = {
181+
val sc = model.userFeatures.sparkContext
182+
val sqlContext = new SQLContext(sc)
183+
import sqlContext.implicits.createDataFrame
184+
val metadata = (thisClassName, thisFormatVersion, model.rank)
185+
val metadataRDD = sc.parallelize(Seq(metadata), 1).toDataFrame("class", "version", "rank")
186+
metadataRDD.toJSON.saveAsTextFile(metadataPath(path))
187+
model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
188+
model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
189+
}
190+
191+
def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
192+
val sqlContext = new SQLContext(sc)
193+
val (className, formatVersion, metadata) = loadMetadata(sc, path)
194+
assert(className == thisClassName)
195+
assert(formatVersion == thisFormatVersion)
196+
val rank = metadata.select("rank").first().getInt(0)
197+
val userFeatures = sqlContext.parquetFile(userPath(path))
198+
.map { case Row(id: Int, features: Seq[Double]) =>
199+
(id, features.toArray)
200+
}
201+
val productFeatures = sqlContext.parquetFile(productPath(path))
202+
.map { case Row(id: Int, features: Seq[Double]) =>
203+
(id, features.toArray)
204+
}
205+
new MatrixFactorizationModel(rank, userFeatures, productFeatures)
206+
}
207+
208+
private def userPath(path: String): String = {
209+
new Path(dataPath(path), "user").toUri.toString
210+
}
211+
212+
private def productPath(path: String): String = {
213+
new Path(dataPath(path), "product").toUri.toString
214+
}
215+
}
216+
}

mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
2222
import org.apache.spark.mllib.util.MLlibTestSparkContext
2323
import org.apache.spark.mllib.util.TestingUtils._
2424
import org.apache.spark.rdd.RDD
25+
import org.apache.spark.util.Utils
2526

2627
class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext {
2728

@@ -53,4 +54,22 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext
5354
new MatrixFactorizationModel(rank, userFeatures, prodFeatures1)
5455
}
5556
}
57+
58+
test("save/load") {
59+
val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures)
60+
val tempDir = Utils.createTempDir()
61+
val path = tempDir.toURI.toString
62+
def collect(features: RDD[(Int, Array[Double])]): Set[(Int, Seq[Double])] = {
63+
features.mapValues(_.toSeq).collect().toSet
64+
}
65+
try {
66+
model.save(sc, path)
67+
val newModel = MatrixFactorizationModel.load(sc, path)
68+
assert(newModel.rank === rank)
69+
assert(collect(newModel.userFeatures) === collect(userFeatures))
70+
assert(collect(newModel.productFeatures) === collect(prodFeatures))
71+
} finally {
72+
Utils.deleteRecursively(tempDir)
73+
}
74+
}
5675
}

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,10 @@
975975
<groupId>com.esotericsoftware.kryo</groupId>
976976
<artifactId>kryo</artifactId>
977977
</exclusion>
978+
<exclusion>
979+
<groupId>org.apache.avro</groupId>
980+
<artifactId>avro-mapred</artifactId>
981+
</exclusion>
978982
</exclusions>
979983
</dependency>
980984
<dependency>

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
424424
/** Extends QueryExecution with hive specific features. */
425425
protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
426426
extends super.QueryExecution(logicalPlan) {
427+
// Like what we do in runHive, makes sure the session represented by the
428+
// `sessionState` field is activated.
429+
if (SessionState.get() != sessionState) {
430+
SessionState.start(sessionState)
431+
}
427432

428433
/**
429434
* Returns the result as a hive compatible sequence of strings. For native commands, the

0 commit comments

Comments
 (0)