Skip to content

Commit

Permalink
SAMZA-1089: Enable YarnJob and ClientHelper to kill a job by name rat…
Browse files Browse the repository at this point in the history
…her than YARN ApplicationID

Missed a couple files in the previous commit to enable YarnJob to kill and get status of a Job based on the job name rather than the YARN ApplicationName.

These changes have been manually verified in a Yarn cluster at LI.

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes apache#114 from jmakes/samza-1089-3
  • Loading branch information
Jacob Maes committed Apr 7, 2017
1 parent d6bd2d7 commit fb51bfa
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,35 @@ class ClientHelper(conf: Configuration) extends Logging {
appId
}

/**
* Gets the list of Yarn [[org.apache.hadoop.yarn.api.records.ApplicationId]]
* corresponding to the specified appName and are "active".
* <p>
* In this context, "active" means that the application is starting or running
* and is not in any terminated state.
* <p>
* In Samza, an appName should be unique and there should only be one active
* applicationId for a given appName, but this can be violated in unusual cases
* like while troubleshooting a new application. So, this method returns as many
* active application ids as it finds.
*
* @param appName the app name as found in the Name column in the Yarn application list.
* @return the active application ids.
*/
def getActiveApplicationIds(appName: String): List[ApplicationId] = {
val getAppsRsp = yarnClient.getApplications

getAppsRsp
.asScala
.filter(appRep => ((
Running.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
|| New.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)
)
&& appName.equals(appRep.getName)))
.map(appRep => appRep.getApplicationId)
.toList
}

def status(appId: ApplicationId): Option[ApplicationStatus] = {
val statusResponse = yarnClient.getApplicationReport(appId)
convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus)
Expand Down Expand Up @@ -251,7 +280,7 @@ class ClientHelper(conf: Configuration) extends Logging {
private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = {
(state, status) match {
case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish)
case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish)
case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) | (YarnApplicationState.FINISHED, _) => Some(UnsuccessfulFinish)
case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New)
case _ => Some(Running)
}
Expand Down Expand Up @@ -337,6 +366,8 @@ class ClientHelper(conf: Configuration) extends Logging {
* Cleanup application staging directory.
*/
def cleanupStagingDir(): Unit = {
YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
if (jobContext != null) {
YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
}
}
}
38 changes: 34 additions & 4 deletions samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,22 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}

def getStatus: ApplicationStatus = {
appId match {
case Some(appId) => client.status(appId).getOrElse(null)
case None => null
getAppId match {
case Some(appId) =>
logger.info("Getting status for applicationId %s" format appId)
client.status(appId).getOrElse(null)
case None =>
logger.info("Unable to report status because no applicationId could be found.")
null
}
}

def kill: YarnJob = {
appId match {
// getAppId only returns one appID. Run multiple times to kill dupes (erroneous case)
getAppId match {
case Some(appId) =>
try {
logger.info("Killing applicationId {}", appId)
client.kill(appId)
} finally {
client.cleanupStagingDir
Expand All @@ -152,4 +158,28 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}
this
}

private def getAppId: Option[ApplicationId] = {
appId match {
case Some(applicationId) =>
appId
case None =>
// Get by name
config.getName match {
case Some(jobName) =>
val applicationName = "%s_%s" format(jobName, config.getJobId.getOrElse(1))
logger.info("Fetching status from YARN for application name %s" format applicationName)
val applicationIds = client.getActiveApplicationIds(applicationName)

applicationIds.foreach(applicationId => {
logger.info("Found applicationId %s for applicationName %s" format(applicationId, applicationName))
})

// Only return one, because there should only be one.
applicationIds.headOption
case None =>
None
}
}
}
}

0 comments on commit fb51bfa

Please sign in to comment.