-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-24793][K8s] Enhance spark-submit for app management #23599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab | |
import java.net.{URI, URL} | ||
import java.security.PrivilegedExceptionAction | ||
import java.text.ParseException | ||
import java.util.UUID | ||
import java.util.{ServiceLoader, UUID} | ||
|
||
import scala.annotation.tailrec | ||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.{ArrayBuffer, HashMap, Map} | ||
import scala.util.{Properties, Try} | ||
|
||
|
@@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging { | |
} | ||
|
||
/** | ||
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only. | ||
* Kill an existing submission. | ||
*/ | ||
private def kill(args: SparkSubmitArguments): Unit = { | ||
new RestSubmissionClient(args.master) | ||
.killSubmission(args.submissionToKill) | ||
if (RestSubmissionClient.supportsRestClient(args.master)) { | ||
new RestSubmissionClient(args.master) | ||
.killSubmission(args.submissionToKill) | ||
} else { | ||
val sparkConf = args.toSparkConf() | ||
sparkConf.set("spark.master", args.master) | ||
SparkSubmitUtils | ||
.getSubmitOperations(args.master) | ||
.kill(args.submissionToKill, sparkConf) | ||
} | ||
} | ||
|
||
/** | ||
* Request the status of an existing submission using the REST protocol. | ||
* Standalone and Mesos cluster mode only. | ||
* Request the status of an existing submission. | ||
*/ | ||
private def requestStatus(args: SparkSubmitArguments): Unit = { | ||
new RestSubmissionClient(args.master) | ||
.requestSubmissionStatus(args.submissionToRequestStatusFor) | ||
if (RestSubmissionClient.supportsRestClient(args.master)) { | ||
new RestSubmissionClient(args.master) | ||
.requestSubmissionStatus(args.submissionToRequestStatusFor) | ||
} else { | ||
val sparkConf = args.toSparkConf() | ||
sparkConf.set("spark.master", args.master) | ||
SparkSubmitUtils | ||
.getSubmitOperations(args.master) | ||
.printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf) | ||
} | ||
} | ||
|
||
/** Print version information to the log. */ | ||
|
@@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging { | |
} | ||
} | ||
|
||
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) } | ||
// update spark config from args | ||
args.toSparkConf(Option(sparkConf)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to keep this here as |
||
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) | ||
val targetDir = Utils.createTempDir() | ||
|
||
|
@@ -1348,6 +1365,23 @@ private[spark] object SparkSubmitUtils { | |
} | ||
} | ||
|
||
private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = { | ||
val loader = Utils.getContextOrSparkClassLoader | ||
val serviceLoaders = | ||
ServiceLoader.load(classOf[SparkSubmitOperation], loader) | ||
.asScala | ||
.filter(_.supports(master)) | ||
|
||
serviceLoaders.size match { | ||
case x if x > 1 => | ||
throw new SparkException(s"Multiple($x) external SparkSubmitOperations " + | ||
s"clients registered for master url ${master}.") | ||
case 1 => serviceLoaders.headOption.get | ||
case _ => | ||
throw new IllegalArgumentException(s"No external SparkSubmitOperations " + | ||
s"clients found for master url: '$master'") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -1360,3 +1394,12 @@ private case class OptionAssigner( | |
deployMode: Int, | ||
clOption: String = null, | ||
confKey: String = null) | ||
|
||
private[spark] trait SparkSubmitOperation { | ||
|
||
def kill(submissionId: String, conf: SparkConf): Unit | ||
|
||
def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit | ||
|
||
def supports(master: String): Boolean | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} | |
import scala.io.Source | ||
import scala.util.Try | ||
|
||
import org.apache.spark.{SparkException, SparkUserAppException} | ||
import org.apache.spark.{SparkConf, SparkException, SparkUserAppException} | ||
import org.apache.spark.deploy.SparkSubmitAction._ | ||
import org.apache.spark.internal.{config, Logging} | ||
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED | ||
|
@@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
} | ||
|
||
private def validateKillArguments(): Unit = { | ||
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { | ||
error("Killing submissions is only supported in standalone or Mesos mode!") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am removing this part and although we dont fail fast we will fail when the new api will do the validation. |
||
if (submissionToKill == null) { | ||
error("Please specify a submission to kill.") | ||
} | ||
} | ||
|
||
private def validateStatusRequestArguments(): Unit = { | ||
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { | ||
error( | ||
"Requesting submission statuses is only supported in standalone or Mesos mode!") | ||
} | ||
if (submissionToRequestStatusFor == null) { | ||
error("Please specify a submission to request status for.") | ||
} | ||
|
@@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
| | ||
| Spark standalone or Mesos with cluster deploy mode only: | ||
| --supervise If given, restarts the driver on failure. | ||
| | ||
| Spark standalone, Mesos or K8s with cluster deploy mode only: | ||
| --kill SUBMISSION_ID If given, kills the driver specified. | ||
| --status SUBMISSION_ID If given, requests the status of the driver specified. | ||
| | ||
|
@@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |
|
||
private def error(msg: String): Unit = throw new SparkException(msg) | ||
|
||
private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = { | ||
// either use an existing config or create a new empty one | ||
sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) { | ||
case (conf, (k, v)) => conf.set(k, v) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be cool to file a bug to turn this path into a plugin like k8s path.