Skip to content

[SPARK-24793][K8s] Enhance spark-submit for app management #23599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
import java.util.{ServiceLoader, UUID}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}

Expand Down Expand Up @@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging {
}

/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
* Kill an existing submission.
*/
private def kill(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
if (RestSubmissionClient.supportsRestClient(args.master)) {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.kill(args.submissionToKill, sparkConf)
}
}

/**
* Request the status of an existing submission using the REST protocol.
* Standalone and Mesos cluster mode only.
* Request the status of an existing submission.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
if (RestSubmissionClient.supportsRestClient(args.master)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cool to file a bug to turn this path into a plugin like k8s path.

new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)
}
}

/** Print version information to the log. */
Expand Down Expand Up @@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging {
}
}

args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
// update spark config from args
args.toSparkConf(Option(sparkConf))
Copy link
Contributor Author

@skonto skonto Mar 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to keep this here as args are being updated within the body of this method. We should clean it up in another PR. args should be immutable and we should only modify SparkConf.

val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

Expand Down Expand Up @@ -1348,6 +1365,23 @@ private[spark] object SparkSubmitUtils {
}
}

private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[SparkSubmitOperation], loader)
.asScala
.filter(_.supports(master))

serviceLoaders.size match {
case x if x > 1 =>
throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
s"clients registered for master url ${master}.")
case 1 => serviceLoaders.headOption.get
case _ =>
throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
s"clients found for master url: '$master'")
}
}
}

/**
Expand All @@ -1360,3 +1394,12 @@ private case class OptionAssigner(
deployMode: Int,
clOption: String = null,
confKey: String = null)

private[spark] trait SparkSubmitOperation {

def kill(submissionId: String, conf: SparkConf): Unit

def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit

def supports(master: String): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try

import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.{SparkConf, SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
Expand Down Expand Up @@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
error("Killing submissions is only supported in standalone or Mesos mode!")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am removing this part and although we dont fail fast we will fail when the new api will do the validation.

if (submissionToKill == null) {
error("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
error("Please specify a submission to request status for.")
}
Expand Down Expand Up @@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark standalone or Mesos with cluster deploy mode only:
| --supervise If given, restarts the driver on failure.
|
| Spark standalone, Mesos or K8s with cluster deploy mode only:
| --kill SUBMISSION_ID If given, kills the driver specified.
| --status SUBMISSION_ID If given, requests the status of the driver specified.
|
Expand Down Expand Up @@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

private def error(msg: String): Unit = throw new SparkException(msg)

private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = {
// either use an existing config or create a new empty one
sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) {
case (conf, (k, v)) => conf.set(k, v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ import org.apache.spark.util.Utils
private[spark] class RestSubmissionClient(master: String) extends Logging {
import RestSubmissionClient._

private val supportedMasterPrefixes = Seq("spark://", "mesos://")

private val masters: Array[String] = if (master.startsWith("spark://")) {
Utils.parseStandaloneMasterUrls(master)
} else {
Expand Down Expand Up @@ -409,6 +407,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {

private[spark] object RestSubmissionClient {

val supportedMasterPrefixes = Seq("spark://", "mesos://")

// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
// on the remote machine (SPARK-12345) (SPARK-25934)
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
Expand All @@ -424,6 +424,10 @@ private[spark] object RestSubmissionClient {
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}
}

private[spark] def supportsRestClient(master: String): Boolean = {
supportedMasterPrefixes.exists(master.startsWith)
}
}

private[spark] class RestSubmissionClientApp extends SparkApplication {
Expand Down Expand Up @@ -456,5 +460,4 @@ private[spark] class RestSubmissionClientApp extends SparkApplication {
val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
run(appResource, mainClass, appArgs, conf, env)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import org.apache.spark.SparkException
* Contains basic command line parsing functionality and methods to parse some common Spark CLI
* options.
*/
private[spark] trait CommandLineUtils {
private[spark] trait CommandLineUtils extends CommandLineLoggingUtils {

def main(args: Array[String]): Unit
}

trait CommandLineLoggingUtils {
// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)

Expand All @@ -41,6 +45,4 @@ private[spark] trait CommandLineUtils {
printMessage("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}

def main(args: Array[String]): Unit
}
17 changes: 17 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,23 @@ class SparkSubmitSuite

conf.get(nonDelimSpaceFromFile._1) should be ("blah")
}

test("get a Spark configuration from arguments") {
val testConf = "spark.test.hello" -> "world"
val masterConf = "spark.master" -> "yarn"
val clArgs = Seq(
"--conf", s"${testConf._1}=${testConf._2}",
"--conf", s"${masterConf._1}=${masterConf._2}",
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
}
}

object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Expand Down
Loading