Skip to content

[Spark-15155][Mesos] Optionally ignore default role resources #12933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,13 @@ See the [configuration page](configuration.html) for information on Spark config
and resource weight sharing.
</td>
</tr>
<tr>
<td><code>spark.mesos.ignoreDefaultRoleResources</code></td>
<td>false</td>
<td>
Only if `spark.mesos.role` has been set, ignore mesos resources with the role `*`.
</td>
</tr>
<tr>
<td><code>spark.mesos.constraints</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[spark] class MesosDriverDescription(
val cores: Double,
val supervise: Boolean,
val command: Command,
schedulerProperties: Map[String, String],
val schedulerProperties: Map[String, String],
val submissionId: String,
val submissionDate: Date,
val retryState: Option[MesosClusterRetryState] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,12 +551,16 @@ private[spark] class MesosClusterScheduler(
currentOffers: List[ResourceOffer],
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
for (submission <- candidates) {
val acceptedResourceRoles = getAcceptedResourceRoles(submission.schedulerProperties)
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o =>
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
val acceptableResources = o.resources.asScala
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a style thing, but I think you'd generally just write (r => acceptedResourceRoles(r.getRole))

.asJava
getResource(acceptableResources, "cpus") >= driverCpu &&
getResource(acceptableResources, "mem") >= driverMem
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent this continuation 2 spaces to make it clear it's a continuation

}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val remainingResources = mutable.Map(offers.map(offer =>
(offer.getId.getValue, offer.getResourcesList)): _*)

val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)

var launchTasks = true

// TODO(mgummelt): combine offers for a single slave
Expand All @@ -393,15 +395,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
for (offer <- offers) {
val slaveId = offer.getSlaveId.getValue
val offerId = offer.getId.getValue
val resources = remainingResources(offerId)
val resources =
remainingResources(offerId).asScala
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
.asJava

if (canLaunchTask(slaveId, resources)) {
if (canLaunchTask(slaveId, resources, acceptedResourceRoles)) {
// Create a task
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
val taskGPUs = Math.min(
Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
Math.max(0, maxGpus - totalGpusAcquired),
getResource(resources, "gpus").toInt)

val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
Expand Down Expand Up @@ -466,7 +472,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
}

private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
private def canLaunchTask(
slaveId: String,
resources: JList[Resource],
acceptedResourceRoles: Set[String]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}

val acceptedResourceRoles = getAcceptedResourceRoles(sc.conf)

// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val acceptableResources = o.getResourcesList.asScala
.filter((r: Resource) => acceptedResourceRoles(r.getRole))
.asJava
val mem = getResource(acceptableResources, "mem")
val cpus = getResource(acceptableResources, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,44 @@ trait MesosSchedulerUtils extends Logging {
// Driver for talking to Mesos
protected var mesosDriver: SchedulerDriver = null

/**
* Returns the configured set of roles that an offer can be selected from
* @param conf Spark configuration
*/
protected def getAcceptedResourceRoles(conf: SparkConf): Set[String] = {
getAcceptedResourceRoles(
conf.getBoolean("spark.mesos.ignoreDefaultRoleResources", false),
conf.getOption("spark.mesos.role"))
}
/**
* Returns the configured set of roles that an offer can be selected from
* @param props Mesos driver description schedulerProperties map
*/
protected def getAcceptedResourceRoles(props: Map[String, String]): Set[String] = {
getAcceptedResourceRoles(
props.get("spark.mesos.ignoreDefaultRoleResources") match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be my taste only but I'm finding this style hard to read.
props.get(...).map(_.toBoolean).getOrElse(false)? It's what I'd expect to see in the spark code base but don't know if it's objectively any better. Still I'd break this out into a val rather than squeeze this into a method arg

case Some(truth) => truth.toBoolean
case None => false
},
props.get("spark.mesos.role"))
}
/**
* Internal version of getAcceptedResourceRoles
* @param ignoreDefaultRoleResources user specified property
* @param role user specified property
*/
private def getAcceptedResourceRoles(
ignoreDefaultRoleResources: Boolean,
role: Option[String]) = {
val roles = ignoreDefaultRoleResources match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems more readable:

val roles = if (ignoreDefaultRoleResources && role.isDefined) Set(role) else Set(Some("*"), role)

case true if role.isDefined => Set(role)
case _ => Set(Some("*"), role)
}
val acceptedRoles = roles.flatten
logDebug(s"Accepting resources from role(s): ${acceptedRoles.mkString(",")}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this log outside of this helper method, as there might be other context in the future that's calling this.

acceptedRoles
}

/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
Expand Down
Loading