Skip to content

[SPARK-3456] YarnAllocator on alpha can lose container requests to RM #2373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()

override protected def allocateContainers(count: Int): YarnAllocateResponse = {
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null

logDebug("numExecutors: " + count)
logDebug("asking for additional executors: " + count + " with already pending: " + pending)
val totalNumAsk = count + pending
if (count <= 0) {
resourceRequests = List()
} else if (preferredHostToCount.isEmpty) {
logDebug("host preferences is empty")
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
Expand All @@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
count,
totalNumAsk,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)

val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
Expand All @@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler(
req.addAllReleases(releasedContainerList)

if (count > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(count,
logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator(
def allocateResources() = {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()

// this is needed by alpha, do it here since we add numPending right after this
val executorsPending = numPendingAllocate.get()

if (missing > 0) {
numPendingAllocate.addAndGet(missing)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
Expand All @@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator(
logDebug("Empty allocation request ...")
}

val allocateResponse = allocateContainers(missing)
val allocateResponse = allocateContainers(missing, executorsPending)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we expose the number of requests pending to the subclass, we don't need to make this take an extra parameter. E.g.

protected def numContainersPending = numPendingAllocate.get()

Then the alpha YarnAllocationHandler#allocateContainers can add this to count instead of the new parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but it seems kind of weird to have it pass count but then grab a global for pending. It would also be easier to unit test taking it as paramater (if we had unit tests for this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought it would simplify the method signatures. Right now the stable code takes in a parameter that it doesn't use, and in the alpha code all we do with those two parameters is add them together anyway. The number of pending container requests is a property of the allocator, so I think it makes sense if all methods in the class have access to this. (Not a huge deal)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern over the interfaces, but in the end hopefully this doesn't matter as hopefully we will be removing alpha support in another release. Also its an internal private interface.

To me this is safer to have one unused variable vs exposing a private variable. For instance, you will notice that if I removed all changes to YarnAllocator (except for making numPendingAllocate protected) and just grabbed it from the alpha/YarnAllocateHandler, it would be wrong - it would ask for 2x the number because we increment the numberPendig before calling allocateContainers. To me exposing that variable could more easily lead to issues like that and perhaps others accessing it and leading to more complicated usage numPending in the future. It also seems more brittle if someone comes in here and refactors things again.

I could change YarnAllocator to rearrange it to call allocateContainers before incrementing numPending for now and put comment there saying the order matters.

I would prefer to leave it, but If you would really like me to I will.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to expose the variable as long as it does what its name suggests. We actually already do this elsewhere in the same class for similar variables (e.g. getNumExecutorRunning or getNumExecutorFailed). In our case, we can't make just those changes because we increment numPendingAllocate before we allocateContainers, but we can easily work around that as you mentioned. These allocation requests aren't technically pending until we've submitted them, so I think it's actually more correct to increment numPendingAllocate afterwards.

Anyway, just a minor suggestion since this is an internal API. It's fine to leave it as is if you prefer.

val allocatedContainers = allocateResponse.getAllocatedContainers()

if (allocatedContainers.size > 0) {
Expand Down Expand Up @@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator(
*
* @param count Number of containers to allocate.
* If zero, should still contact RM (as a heartbeat).
* @param pending Number of containers pending allocate. Only used on alpha.
* @return Response to the allocation request.
*/
protected def allocateContainers(count: Int): YarnAllocateResponse
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse

/** Called to release a previously allocated container. */
protected def releaseContainer(container: Container): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler(
amClient.releaseAssignedContainer(container.getId())
}

override protected def allocateContainers(count: Int): YarnAllocateResponse = {
// pending isn't used on stable as the AMRMClient handles incremental asks
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
addResourceRequests(count)

// We have already set the container request. Poll the ResourceManager for a response.
Expand Down