Skip to content

SPARK-2604: Incorporating memory overhead of executor during the verifcation of cluster resources, before starting the application #1571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
// Memory for the ApplicationMaster.
capability.setMemory(args.amMemory + memoryOverhead)
capability.setMemory(args.amMemory + amMemoryOverhead)
amContainer.setResource(capability)

appContext.setQueue(args.amQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ trait ClientBase extends Logging {
val APP_FILE_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)

// Additional memory overhead - in mb.
protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
// Additional memory overhead for Application Master - in mb.
protected def amMemoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// Additional memory overhead for Executor - in mb.
private def execMemoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
YarnAllocationHandler.MEMORY_OVERHEAD)

// TODO(harvey): This could just go in ClientArguments.
Expand All @@ -76,10 +80,10 @@ trait ClientBase extends Logging {
"Error: You must specify a user jar when running in standalone mode!"),
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
(args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + memoryOverhead),
(args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
"must be greater than: " + memoryOverhead.toString)
(args.amMemory <= amMemoryOverhead) -> ("Error: AM memory size must be" +
"greater than: " + amMemoryOverhead),
(args.executorMemory <= execMemoryOverhead) -> ("Error: Executor memory size" +
"must be greater than: " + execMemoryOverhead.toString)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
Expand All @@ -97,19 +101,20 @@ trait ClientBase extends Logging {
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)

// If we have requested more then the clusters max for a single resource then exit.
if (args.executorMemory > maxMem) {
val execMem = args.executorMemory + execMemoryOverhead
if (execMem > maxMem) {
val errorMessage =
"Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, maxMem)
"Required executor memory (%d MB) along with overhead ( %d MB), is above the max threshold (%d MB) of this cluster."
.format(args.executorMemory, execMemoryOverhead, maxMem)

logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + memoryOverhead
val amMem = args.amMemory + amMemoryOverhead
if (amMem > maxMem) {

val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
.format(args.amMemory, maxMem)
val errorMessage = "Required AM memory (%d) alng with overhead, is above the max threshold (%d) of this cluster."
.format(args.amMemory, amMemoryOverhead, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa

// Memory for the ApplicationMaster.
val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
memoryResource.setMemory(args.amMemory + memoryOverhead)
memoryResource.setMemory(args.amMemory + amMemoryOverhead)
appContext.setResource(memoryResource)

// Finally, submit and monitor the application.
Expand Down