Skip to content

Commit fc73ca5

Browse files
committed
Merge pull request alteryx#9 from andrewor14/ui-refactor
Merge with master
2 parents f4f4cbe + 642dd88 commit fc73ca5

File tree

136 files changed

+1633
-653
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+1633
-653
lines changed

.rat-excludes

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,6 @@ work
3939
.*\.q
4040
golden
4141
test.out/*
42-
.*iml
42+
.*iml
43+
python/metastore/service.properties
44+
python/metastore/db.lck

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializ
2828
class TestMessage(val targetId: String) extends Message[String] with Serializable
2929

3030
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
31-
31+
3232
var sc: SparkContext = _
33-
33+
3434
after {
3535
if (sc != null) {
3636
sc.stop()

core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@
2424
*/
2525
public interface FlatMapFunction<T, R> extends Serializable {
2626
public Iterable<R> call(T t) throws Exception;
27-
}
27+
}

core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@
2424
*/
2525
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
2626
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
27-
}
27+
}

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,12 @@ table.sortable thead {
7878
background-repeat: repeat-x;
7979
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
8080
}
81+
82+
span.kill-link {
83+
margin-right: 2px;
84+
color: gray;
85+
}
86+
87+
span.kill-link a {
88+
color: gray;
89+
}

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import com.google.common.io.Files
2424
import org.apache.spark.util.Utils
2525

2626
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
27-
27+
2828
var baseDir : File = null
2929
var fileDir : File = null
3030
var jarDir : File = null
3131
var httpServer : HttpServer = null
3232
var serverUri : String = null
33-
33+
3434
def initialize() {
3535
baseDir = Utils.createTempDir()
3636
fileDir = new File(baseDir, "files")
@@ -43,24 +43,24 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
4343
serverUri = httpServer.uri
4444
logDebug("HTTP file server started at: " + serverUri)
4545
}
46-
46+
4747
def stop() {
4848
httpServer.stop()
4949
}
50-
50+
5151
def addFile(file: File) : String = {
5252
addFileToDir(file, fileDir)
5353
serverUri + "/files/" + file.getName
5454
}
55-
55+
5656
def addJar(file: File) : String = {
5757
addFileToDir(file, jarDir)
5858
serverUri + "/jars/" + file.getName
5959
}
60-
60+
6161
def addFileToDir(file: File, dir: File) : String = {
6262
Files.copy(file, new File(dir, file.getName))
6363
dir + "/" + file.getName
6464
}
65-
65+
6666
}

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,19 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
8383
}
8484
}
8585

86-
/**
86+
/**
8787
* Setup Jetty to the HashLoginService using a single user with our
8888
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
8989
* isn't passed in plaintext.
9090
*/
9191
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
9292
val constraint = new Constraint()
93-
// use DIGEST-MD5 as the authentication mechanism
93+
// use DIGEST-MD5 as the authentication mechanism
9494
constraint.setName(Constraint.__DIGEST_AUTH)
9595
constraint.setRoles(Array("user"))
9696
constraint.setAuthenticate(true)
9797
constraint.setDataConstraint(Constraint.DC_NONE)
98-
98+
9999
val cm = new ConstraintMapping()
100100
cm.setConstraint(constraint)
101101
cm.setPathSpec("/*")

core/src/main/scala/org/apache/spark/Partition.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ trait Partition extends Serializable {
2525
* Get the split's index within its parent RDD
2626
*/
2727
def index: Int
28-
28+
2929
// A better default implementation of HashCode
3030
override def hashCode(): Int = index
3131
}

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 44 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,93 +25,93 @@ import org.apache.hadoop.io.Text
2525

2626
import org.apache.spark.deploy.SparkHadoopUtil
2727

28-
/**
29-
* Spark class responsible for security.
30-
*
28+
/**
29+
* Spark class responsible for security.
30+
*
3131
* In general this class should be instantiated by the SparkEnv and most components
32-
* should access it from that. There are some cases where the SparkEnv hasn't been
32+
* should access it from that. There are some cases where the SparkEnv hasn't been
3333
* initialized yet and this class must be instantiated directly.
34-
*
34+
*
3535
* Spark currently supports authentication via a shared secret.
3636
* Authentication can be configured to be on via the 'spark.authenticate' configuration
37-
* parameter. This parameter controls whether the Spark communication protocols do
37+
* parameter. This parameter controls whether the Spark communication protocols do
3838
* authentication using the shared secret. This authentication is a basic handshake to
3939
* make sure both sides have the same shared secret and are allowed to communicate.
40-
* If the shared secret is not identical they will not be allowed to communicate.
41-
*
42-
* The Spark UI can also be secured by using javax servlet filters. A user may want to
43-
* secure the UI if it has data that other users should not be allowed to see. The javax
44-
* servlet filter specified by the user can authenticate the user and then once the user
45-
* is logged in, Spark can compare that user versus the view acls to make sure they are
46-
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
40+
* If the shared secret is not identical they will not be allowed to communicate.
41+
*
42+
* The Spark UI can also be secured by using javax servlet filters. A user may want to
43+
* secure the UI if it has data that other users should not be allowed to see. The javax
44+
* servlet filter specified by the user can authenticate the user and then once the user
45+
* is logged in, Spark can compare that user versus the view acls to make sure they are
46+
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
4747
* control the behavior of the acls. Note that the person who started the application
4848
* always has view access to the UI.
4949
*
5050
* Spark does not currently support encryption after authentication.
51-
*
51+
*
5252
* At this point spark has multiple communication protocols that need to be secured and
5353
* different underlying mechanisms are used depending on the protocol:
5454
*
55-
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
56-
* Akka remoting allows you to specify a secure cookie that will be exchanged
57-
* and ensured to be identical in the connection handshake between the client
58-
* and the server. If they are not identical then the client will be refused
59-
* to connect to the server. There is no control of the underlying
60-
* authentication mechanism so its not clear if the password is passed in
55+
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
56+
* Akka remoting allows you to specify a secure cookie that will be exchanged
57+
* and ensured to be identical in the connection handshake between the client
58+
* and the server. If they are not identical then the client will be refused
59+
* to connect to the server. There is no control of the underlying
60+
* authentication mechanism so its not clear if the password is passed in
6161
* plaintext or uses DIGEST-MD5 or some other mechanism.
6262
* Akka also has an option to turn on SSL, this option is not currently supported
6363
* but we could add a configuration option in the future.
64-
*
65-
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
66-
* for the HttpServer. Jetty supports multiple authentication mechanisms -
67-
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
64+
*
65+
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
66+
* for the HttpServer. Jetty supports multiple authentication mechanisms -
67+
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
6868
* services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService
69-
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
69+
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
7070
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
7171
* in plaintext.
7272
* We currently do not support SSL (https), but Jetty can be configured to use it
7373
* so we could add a configuration option for this in the future.
74-
*
74+
*
7575
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
76-
* Any clients must specify the user and password. There is a default
76+
* Any clients must specify the user and password. There is a default
7777
* Authenticator installed in the SecurityManager to how it does the authentication
7878
* and in this case gets the user name and password from the request.
7979
*
80-
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
81-
* exchange messages. For this we use the Java SASL
82-
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
80+
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
81+
* exchange messages. For this we use the Java SASL
82+
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
8383
* as the authentication mechanism. This means the shared secret is not passed
8484
* over the wire in plaintext.
8585
* Note that SASL is pluggable as to what mechanism it uses. We currently use
8686
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
8787
* Spark currently supports "auth" for the quality of protection, which means
8888
* the connection is not supporting integrity or privacy protection (encryption)
89-
* after authentication. SASL also supports "auth-int" and "auth-conf" which
89+
* after authentication. SASL also supports "auth-int" and "auth-conf" which
9090
* SPARK could be support in the future to allow the user to specify the quality
91-
* of protection they want. If we support those, the messages will also have to
91+
* of protection they want. If we support those, the messages will also have to
9292
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
93-
*
94-
* Since the connectionManager does asynchronous messages passing, the SASL
93+
*
94+
* Since the connectionManager does asynchronous messages passing, the SASL
9595
* authentication is a bit more complex. A ConnectionManager can be both a client
9696
* and a Server, so for a particular connection is has to determine what to do.
97-
* A ConnectionId was added to be able to track connections and is used to
97+
* A ConnectionId was added to be able to track connections and is used to
9898
* match up incoming messages with connections waiting for authentication.
9999
* If its acting as a client and trying to send a message to another ConnectionManager,
100100
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
101101
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
102102
* and waits for the response from the server and does the handshake.
103103
*
104-
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
104+
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
105105
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
106106
* properly. For non-Yarn deployments, users can write a filter to go through a
107107
* companies normal login service. If an authentication filter is in place then the
108108
* SparkUI can be configured to check the logged in user against the list of users who
109109
* have view acls to see if that user is authorized.
110-
* The filters can also be used for many different purposes. For instance filters
110+
* The filters can also be used for many different purposes. For instance filters
111111
* could be used for logging, encryption, or compression.
112-
*
112+
*
113113
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
114-
*
114+
*
115115
* For Yarn deployments, the secret is automatically generated using the Akka remote
116116
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
117117
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
@@ -121,7 +121,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
121121
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
122122
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
123123
* and Spark will use that to do authorization against the view acls.
124-
*
124+
*
125125
* For other Spark deployments, the shared secret must be specified via the
126126
* spark.authenticate.secret config.
127127
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
@@ -152,7 +152,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
152152
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())
153153

154154
// Set our own authenticator to properly negotiate user/password for HTTP connections.
155-
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
155+
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
156156
// only set once.
157157
if (authOn) {
158158
Authenticator.setDefault(
@@ -214,12 +214,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
214214
def uiAclsEnabled(): Boolean = uiAclsOn
215215

216216
/**
217-
* Checks the given user against the view acl list to see if they have
217+
* Checks the given user against the view acl list to see if they have
218218
* authorization to view the UI. If the UI acls must are disabled
219219
* via spark.ui.acls.enable, all users have view access.
220-
*
220+
*
221221
* @param user to see if is authorized
222-
* @return true is the user has permission, otherwise false
222+
* @return true is the user has permission, otherwise false
223223
*/
224224
def checkUIViewPermissions(user: String): Boolean = {
225225
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,6 +1137,16 @@ class SparkContext(config: SparkConf) extends Logging {
11371137
dagScheduler.cancelAllJobs()
11381138
}
11391139

1140+
/** Cancel a given job if it's scheduled or running */
1141+
private[spark] def cancelJob(jobId: Int) {
1142+
dagScheduler.cancelJob(jobId)
1143+
}
1144+
1145+
/** Cancel a given stage and all jobs associated with it */
1146+
private[spark] def cancelStage(stageId: Int) {
1147+
dagScheduler.cancelStage(stageId)
1148+
}
1149+
11401150
/**
11411151
* Clean a closure to make it ready to serialized and send to tasks
11421152
* (removes unreferenced variables in $outer's, updates REPL variables)

core/src/main/scala/org/apache/spark/SparkException.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ package org.apache.spark
2020
class SparkException(message: String, cause: Throwable)
2121
extends Exception(message, cause) {
2222

23-
def this(message: String) = this(message, null)
23+
def this(message: String) = this(message, null)
2424
}

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
4242

4343
private val now = new Date()
4444
private val conf = new SerializableWritable(jobConf)
45-
45+
4646
private var jobID = 0
4747
private var splitID = 0
4848
private var attemptID = 0
@@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
5858
def preSetup() {
5959
setIDs(0, 0, 0)
6060
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
61-
62-
val jCtxt = getJobContext()
61+
62+
val jCtxt = getJobContext()
6363
getOutputCommitter().setupJob(jCtxt)
6464
}
6565

@@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
7474
val numfmt = NumberFormat.getInstance()
7575
numfmt.setMinimumIntegerDigits(5)
7676
numfmt.setGroupingUsed(false)
77-
77+
7878
val outputName = "part-" + numfmt.format(splitID)
7979
val path = FileOutputFormat.getOutputPath(conf.value)
8080
val fs: FileSystem = {
@@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
8585
}
8686
}
8787

88-
getOutputCommitter().setupTask(getTaskContext())
88+
getOutputCommitter().setupTask(getTaskContext())
8989
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
9090
}
9191

@@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
103103

104104
def commit() {
105105
val taCtxt = getTaskContext()
106-
val cmtr = getOutputCommitter()
106+
val cmtr = getOutputCommitter()
107107
if (cmtr.needsTaskCommit(taCtxt)) {
108108
try {
109109
cmtr.commitTask(taCtxt)
110110
logInfo (taID + ": Committed")
111111
} catch {
112-
case e: IOException => {
112+
case e: IOException => {
113113
logError("Error committing the output of task: " + taID.value, e)
114114
cmtr.abortTask(taCtxt)
115115
throw e
116116
}
117-
}
117+
}
118118
} else {
119119
logWarning ("No need to commit output of task: " + taID.value)
120120
}
@@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
144144
}
145145

146146
private def getJobContext(): JobContext = {
147-
if (jobContext == null) {
147+
if (jobContext == null) {
148148
jobContext = newJobContext(conf.value, jID.value)
149149
}
150150
jobContext
@@ -175,7 +175,7 @@ object SparkHadoopWriter {
175175
val jobtrackerID = formatter.format(time)
176176
new JobID(jobtrackerID, id)
177177
}
178-
178+
179179
def createPathFromString(path: String, conf: JobConf): Path = {
180180
if (path == null) {
181181
throw new IllegalArgumentException("Output path is null")

0 commit comments

Comments
 (0)