-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-3477] Clean up code in Yarn Client / ClientBase #2350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Clearly, a lot of documentation and further work needs to be done in ClientBase.scala. This is only the first checkpoint.
We implement a while loop to monitor an application's state in four separate places (stable/Client, alpha/Client, and twice in YarnClientSchedulerBackend). This commit reduces this to one.
The getClientToken (or getClientToAMToken) method in Hadoop apparently returns null sometimes. We need to prevent NPEs that result from this. Yay documentation.
(numExecutors <= 0) -> "You must specify at least 1 executor!", | ||
(amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB", | ||
(executorMemory <= executorMemoryOverhead) -> | ||
s"Executor memory must be > $executorMemoryOverhead MB" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs You had a few comments about a few of these checks being outdated in a separate PR. Which ones are no longer relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the memory ones are. see https://issues.apache.org/jira/browse/SPARK-3476
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say just leave it as in for this pr and we can handle under the jira I filed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll leave these as is for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed them here: #2528
QA tests have started for PR 2350 at commit
|
QA tests have finished for PR 2350 at commit
|
@vanzin @tgravescs It would be great if you could do a thorough review. I will do one myself but it would be good if we have more eyes on this. @tgravescs it would be really helpful if you could test the basic applications on an alpha Yarn cluster in particular. Thanks a lot in advance. |
@andrewor14 thanks for working on this. Cleanup and commonizing are good, but we should try to minimize these huge changes. They are really hard to review adequately and take a lot of time. Going forward Let's try to break changes like this up into smaller ones when possible. |
No problem. This is actually only the first set of changes I intend to push out. There is still more clean up to be done in other parts of the Yarn code, but I decided to leave it out for this PR. I can submit smaller patches for the rest of it. |
Thanks Andrew, what specific jira are you referring to? |
validateArgs() | ||
|
||
/** Load any default arguments provided through environment variables and Spark properties. */ | ||
private def loadDefaultArgs(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name of this function seems a bit misleading, this is just reading configs, not loading default arguments.
There's only so much abstraction that you could do before it gets too complicated.
QA tests have started for PR 2350 at commit
|
QA tests have finished for PR 2350 at commit
|
QA tests have started for PR 2350 at commit
|
QA tests have finished for PR 2350 at commit
|
(1) Now it provides a more loggable format if the value is null or "" (2) The application report details can change over time. Instead of logging them once at the beginning, we log them every time the application state changes.
test this please |
QA tests have started for PR 2350 at commit
|
QA tests have finished for PR 2350 at commit
|
@vanzin @tgravescs I think I have addressed all of your comments. Let me know if I missed something. Can you look at the latest changes? I have tested the basic applications on a Hadoop 2.4 cluster. |
* Copy the given file to a remote file system if needed. This is used for preparing | ||
* resources for launching the ApplicationMaster container. Exposed for testing. | ||
*/ | ||
def copyFileToRemote( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I prefer the original name of this function. How we are using it, its actually copying a file from a remote to the local hdfs, so copy to remote doesn't make sense to me. Obviously since its takes both dest and src it could do either, so perhaps we should make the name more generic and not include to or from. Its really copy to dest if fs is different. I don't have a good short name for that though..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was under the impression that the source file is always local; the only usages of this are in ClientBase#prepareLocalResources
in the old code:
L213: val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), ...)
L234: val destPath = copyRemoteFile(dst, localPath, replication)
So aren't we copying a local file to HDFS instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its supposed to support copying it from other locations other then just file://, like another HDFS location. The local variable is named badly (localPath). Its actually reading what is specified by the user for the user app jar, spark jar, etc.
I guess it depends on the definition of remote here. In general when running on yarn I wouldn't consider the hdfs installation on that yarn cluster as remote. But its all in the perception. I'm fine with the name as long as we are clear in description of what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I'll update the comment to make it more explicit.
This mostly looks good. A couple minor comments is all. I do also still want to run through some tests on alpha. |
val state = report.getYarnApplicationState | ||
|
||
if (logApplicationReport) { | ||
logInfo(s"Application report from ResourceManager for app ${appId.getId} (state: $state)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like we wouldn't need the "from ResourceManager" here. Also could we put the full application id here instead of just the last bit. Its must easier to copy and paste if the user wants to grab it and use in yarn command of ui.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
I gave it another look and it looks OK to me. I'm sure if there are any regressions, we'll find that out pretty quickly. |
Hi @tgravescs I have addressed your recent comments. Have you had a chance to test this on an alpha cluster? |
QA tests have started for PR 2350 at commit
|
QA tests have finished for PR 2350 at commit
|
yes I had tested an earlier version and didn't find any issues. Let me look at this tomorrow and i'll try it out again on this hopefully last version. |
ran through some tests on 0.23 and seems to be working fine. I'll commit this. Thanks @andrewor14 ! |
Great, thanks for reviewing @tgravescs and @vanzin |
This is part of a broader effort to clean up the Yarn integration code after #2020.
The high-level changes in this PR include:
I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar that references classes in other jars in both client and cluster mode. I also made changes in the alpha API, though I do not have access to an alpha cluster. I have verified that it compiles, but it would be ideal if others can help test it.
For those interested in some examples in detail, please read on.
_Appendix_
getApplicationReport
from the RM is duplicated in 4 places: in the stableClient
, alphaClient
, and twice inYarnClientSchedulerBackend
. We should not have different loops for client and cluster deploy modes.ClientBase#getLocalPath
returnsnull
on certain conditions, and its only callerClientBase#addFileToClasspath
checks whether the value returned isnull
. We could just have the caller check on that same condition to avoid passingnull
s around.YarnSparkHadoopUtil#addToEnvironment
, we take in an argumentclasspathSeparator
that always has the same value upstream (i.e.File.pathSeparator
). This argument is now removed from the signature and all callers of this method upstream.ClientBase#copyRemoteFile
is now renamed tocopyFileToRemote
. It was unclear whether we are copying a remote file to our local file system, or copying a locally visible file to a remote file system. Also, even the content of the method has inaccurately named variables. We useval remoteFs
to signify the file system of the locally visible file andval fs
to signify the remote, destination file system. These are now renamedsrcFs
anddestFs
respectively.DEBUG
.Client
,ClientBase
,YarnSparkHadoopUtil
etc.) is intended to be used by a Spark application (the user should go through Spark submit instead). At the very least they should beprivate[spark]
.