Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

closes #1151: delete multiple jobs from sge at once #1159

Merged
merged 5 commits into from
Jan 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
simplify job sweep deletion
- qdel is not necessary on DB sweep
- no need to fetch the job on batch delete old jobs (only drawback is that the watchers will need to refresh in order to see it removed)
  • Loading branch information
felixgabler committed Dec 1, 2020
commit 30a73edf633c399d1bd5d11e7f6fbd0744836095
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ object Qdel {

def run(sgeJobId: String): Int = s"qdel $sgeJobId" !

def runMultiple(sgeJobIds: List[String]): Int = s"qdel ${sgeJobIds.mkString(" ")}" !

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import de.proteinevolution.tel.execution.{ExecutionContext, WrapperExecutionFact
import de.proteinevolution.tel.runscripts.Runscript.Evaluation
import de.proteinevolution.tel.runscripts._
import javax.inject.Inject
import play.api.Configuration
import play.api.cache.{NamedCache, SyncCacheApi}
import play.api.libs.mailer.MailerClient
import play.api.Configuration
import reactivemongo.api.bson.BSONDateTime

import scala.concurrent.Future
Expand Down Expand Up @@ -152,7 +152,7 @@ class JobActor @Inject()(
}
}

private def removeJob(jobID: String): Unit = {
private def updateJobLogAndExecution(jobID: String): Unit = {
// Save Job Event Log to the collection and remove it from the map afterwards
if (currentJobLogs.contains(jobID)) {
currentJobs = currentJobs.filter(_._1 != jobID)
Expand All @@ -169,18 +169,20 @@ class JobActor @Inject()(
}
}

private def delete(job: Job): Future[Unit] = {
s"${constants.jobPath}${job.jobID}".toFile.delete(swallowIOExceptions = true)
removeJob(job.jobID) // Remove the job from the current job map
val foundWatchers = job.watchList.flatMap(userID => wsActorCache.get(userID): Option[List[ActorRef]])
foundWatchers.flatten.foreach(_ ! ClearJob(job.jobID))
job.clusterData.foreach(clusterData => Qdel.run(clusterData.sgeID))
private def deleteJob(jobID: String): Future[Unit] = {
s"${constants.jobPath}${jobID}".toFile.delete(swallowIOExceptions = true)
updateJobLogAndExecution(jobID) // Remove the job from the current job map
for {
_ <- jobDao.removeJob(job.jobID)
_ <- userDao.removeJob(job.jobID)
_ <- jobDao.removeJob(jobID)
_ <- userDao.removeJob(jobID)
} yield ()
}

private def sendClearEvent(job: Job): Unit = {
val foundWatchers = job.watchList.flatMap(userID => wsActorCache.get(userID): Option[List[ActorRef]])
foundWatchers.flatten.foreach(_ ! ClearJob(job.jobID))
}

private def updateJobState(job: Job): Future[Job] = {
// Push the updated job into the current jobs
currentJobs = currentJobs.updated(job.jobID, job)
Expand Down Expand Up @@ -344,7 +346,7 @@ class JobActor @Inject()(
case None => NotUsed
}

case Delete(jobID, userIDOption) =>
case Delete(jobID, userID) =>
val verbose = true // just switch this on / off for logging
if (verbose) log.info(s"[JobActor[$jobActorNumber].Delete] Received Delete for $jobID")
this
Expand All @@ -361,21 +363,19 @@ class JobActor @Inject()(
.foreach {
case Some(job) =>
// Delete the job when the user is the owner and clear it otherwise
if (userIDOption.isEmpty || userIDOption.get == job.ownerID) {
if (userID == job.ownerID) {
if (verbose)
log.info(s"[JobActor[$jobActorNumber].Delete] Found Job with ${job.jobID} - starting file deletion")
delete(job)
job.clusterData.foreach(clusterData => Qdel.run(clusterData.sgeID))
sendClearEvent(job)
deleteJob(job.jobID)
} else {
userIDOption match {
case Some(userID) =>
self ! RemoveFromWatchlist(jobID, userID)
case None => NotUsed
}
self ! RemoveFromWatchlist(jobID, userID)
}
case None => NotUsed
}

case DeleteList(jobList, _) => val _ = Qdel.runMultiple(jobList)
case DeleteList(jobList) => jobList.foreach(deleteJob)

case CheckIPHash(jobID) =>
getCurrentJob(jobID).foreach {
Expand Down Expand Up @@ -568,7 +568,7 @@ class JobActor @Inject()(
// Now we can update the JobState and remove it, once the update has completed
updateJobState(job).map { job =>
//Remove the job from the jobActor
removeJob(job.jobID)
updateJobLogAndExecution(job.jobID)
// Tell the user that their job finished via eMail (can be either failed or done)
sendJobUpdateMail(job)
}
Expand Down Expand Up @@ -653,10 +653,10 @@ object JobActor {
case class RemoveFromWatchlist(jobID: String, userID: String)

// JobActor is requested to Delete the job
case class Delete(jobID: String, userID: Option[String] = None)
case class Delete(jobID: String, userID: String)

// Delete multiple jobs at once
case class DeleteList(jobIDs: List[String], userID: Option[String] = None)
case class DeleteList(jobIDs: List[String])

// Job Controller receives a job state change from the SGE or from any other valid source
case class JobStateChanged(jobID: String, jobState: JobState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SubmissionController @Inject()(

def delete(jobID: String): Action[AnyContent] = userAction { implicit request =>
logger.info("Delete Action in JobController reached")
jobActorAccess.sendToJobActor(jobID, Delete(jobID, Some(request.user.userID)))
jobActorAccess.sendToJobActor(jobID, Delete(jobID, request.user.userID))
NoContent
}

Expand Down