Skip to content

Commit

Permalink
closes #1151: delete multiple jobs from sge at once (#1159)
Browse files Browse the repository at this point in the history
* delete multiple jobs from sge at once

* simplify job sweep deletion
- qdel is not necessary on DB sweep
- no need to fetch the job on batch delete old jobs (only drawback is that the watchers will need to refresh in order to see it removed)

* small fixes for job sweep

Co-authored-by: Felix Gabler <felix.gabler98@gmail.com>

Co-authored-by: Felix Gabler <felix.gabler98@gmail.com>
Co-authored-by: Vikram Alva <vikram.alva@gmail.com>
  • Loading branch information
3 people authored Jan 3, 2021
1 parent cd12e9e commit 9815fa9
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 32 deletions.
12 changes: 12 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Set the default behavior, in case people don't have core.autocrlf set.
* text=auto

# Declare files that will always have LF line endings on checkout.
*.* text eol=lf

# Denote all files that are truly binary and should not be modified.
*.png binary
*.ico binary
*.jpg binary
*.exe binary
*.msi binary
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ conf/.env
frontend/.env*
frontend/.yarn/cache
.bsp
*.iml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import de.proteinevolution.auth.models.MailTemplate.OldAccountEmail
import de.proteinevolution.backend.actors.DatabaseMonitor.{DeleteOldJobs, DeleteOldUsers}
import de.proteinevolution.backend.dao.BackendDao
import de.proteinevolution.common.models.ConstantsV2
import de.proteinevolution.jobs.actors.JobActor.Delete
import de.proteinevolution.jobs.actors.JobActor.DeleteList
import de.proteinevolution.jobs.dao.JobDao
import de.proteinevolution.jobs.services.JobActorAccess
import javax.inject.{Inject, Singleton}
Expand Down Expand Up @@ -58,7 +58,7 @@ final class DatabaseMonitor @Inject()(
// interval calling the user deletion method automatically
private val jobDeletionScheduler: Cancellable = {
// scheduler should use the system dispatcher
context.system.scheduler.scheduleWithFixedDelay(constants.jobDeletionDelay, constants.userDeletionInterval, self, DeleteOldJobs)(
context.system.scheduler.scheduleWithFixedDelay(constants.jobDeletionDelay, constants.jobDeletionInterval, self, DeleteOldJobs)(
context.system.dispatcher
)
}
Expand Down Expand Up @@ -119,13 +119,12 @@ final class DatabaseMonitor @Inject()(
}

private def deleteOldJobs(): Unit = {
log.info("[Job Deletion] finding old jobs...")
jobDao.findOldJobs().foreach { jobList =>
log.info(s"[Job Deletion] found ${jobList.length} jobs for deletion. Sending to job actors.")
jobList.foreach { job =>
// Just send a deletion request to the job actor responsible for the job
jobActorAccess.sendToJobActor(job.jobID, Delete(job.jobID))
}
log.info("[Job Deletion] Cleaning up old jobs")
for {
jobList <- jobDao.findOldJobs()
jobIds = jobList.collect(_.jobID)
} {
context.system.eventStream.publish(DeleteList(jobIds))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ final class BackendController @Inject()(
extends ToolkitController(cc)
with Logging {

//TODO currently working mithril routes for the backend
def index: Action[AnyContent] = userAction { implicit request =>
if (request.user.isSuperuser) {
NoCache(Ok(List("Index Page").asJson))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ import de.proteinevolution.tel.execution.{ExecutionContext, WrapperExecutionFact
import de.proteinevolution.tel.runscripts.Runscript.Evaluation
import de.proteinevolution.tel.runscripts._
import javax.inject.Inject
import play.api.Configuration
import play.api.cache.{NamedCache, SyncCacheApi}
import play.api.libs.mailer.MailerClient
import play.api.Configuration
import reactivemongo.api.bson.BSONDateTime

import scala.concurrent.Future
Expand Down Expand Up @@ -152,7 +152,7 @@ class JobActor @Inject()(
}
}

private def removeJob(jobID: String): Unit = {
private def updateJobLogAndExecution(jobID: String): Unit = {
// Save Job Event Log to the collection and remove it from the map afterwards
if (currentJobLogs.contains(jobID)) {
currentJobs = currentJobs.filter(_._1 != jobID)
Expand All @@ -169,18 +169,20 @@ class JobActor @Inject()(
}
}

private def delete(job: Job): Future[Unit] = {
s"${constants.jobPath}${job.jobID}".toFile.delete(swallowIOExceptions = true)
removeJob(job.jobID) // Remove the job from the current job map
val foundWatchers = job.watchList.flatMap(userID => wsActorCache.get(userID): Option[List[ActorRef]])
foundWatchers.flatten.foreach(_ ! ClearJob(job.jobID))
job.clusterData.foreach(clusterData => Qdel.run(clusterData.sgeID))
private def deleteJob(jobID: String): Future[Unit] = {
s"${constants.jobPath}${jobID}".toFile.delete(swallowIOExceptions = true)
updateJobLogAndExecution(jobID) // Remove the job from the current job map
for {
_ <- jobDao.removeJob(job.jobID)
_ <- userDao.removeJob(job.jobID)
_ <- jobDao.removeJob(jobID)
_ <- userDao.removeJob(jobID)
} yield ()
}

private def sendClearEvent(job: Job): Unit = {
val foundWatchers = job.watchList.flatMap(userID => wsActorCache.get(userID): Option[List[ActorRef]])
foundWatchers.flatten.foreach(_ ! ClearJob(job.jobID))
}

private def updateJobState(job: Job): Future[Job] = {
// Push the updated job into the current jobs
currentJobs = currentJobs.updated(job.jobID, job)
Expand Down Expand Up @@ -244,7 +246,8 @@ class JobActor @Inject()(
}

override def preStart(): Unit = {
val _ = context.system.eventStream.subscribe(self, classOf[PolledJobs])
context.system.eventStream.subscribe(self, classOf[PolledJobs])
val _ = context.system.eventStream.subscribe(self, classOf[DeleteList])
}

override def postStop(): Unit = {
Expand Down Expand Up @@ -343,7 +346,7 @@ class JobActor @Inject()(
case None => NotUsed
}

case Delete(jobID, userIDOption) =>
case Delete(jobID, userID) =>
val verbose = true // just switch this on / off for logging
if (verbose) log.info(s"[JobActor[$jobActorNumber].Delete] Received Delete for $jobID")
this
Expand All @@ -360,20 +363,23 @@ class JobActor @Inject()(
.foreach {
case Some(job) =>
// Delete the job when the user is the owner and clear it otherwise
if (userIDOption.isEmpty || userIDOption.get == job.ownerID) {
if (userID == job.ownerID) {
if (verbose)
log.info(s"[JobActor[$jobActorNumber].Delete] Found Job with ${job.jobID} - starting file deletion")
delete(job)
job.clusterData.foreach(clusterData => Qdel.run(clusterData.sgeID))
sendClearEvent(job)
deleteJob(job.jobID)
} else {
userIDOption match {
case Some(userID) =>
self ! RemoveFromWatchlist(jobID, userID)
case None => NotUsed
}
self ! RemoveFromWatchlist(jobID, userID)
}
case None => NotUsed
}

case DeleteList(jobList) =>
log.info(s"[Job Deletion] Deleting ${jobList.length} jobs")
jobList.foreach(deleteJob)
log.info("[Job Deletion] Finished cleaning up old jobs")

case CheckIPHash(jobID) =>
getCurrentJob(jobID).foreach {
case Some(job) =>
Expand Down Expand Up @@ -565,7 +571,7 @@ class JobActor @Inject()(
// Now we can update the JobState and remove it, once the update has completed
updateJobState(job).map { job =>
//Remove the job from the jobActor
removeJob(job.jobID)
updateJobLogAndExecution(job.jobID)
// Tell the user that their job finished via eMail (can be either failed or done)
sendJobUpdateMail(job)
}
Expand Down Expand Up @@ -650,7 +656,10 @@ object JobActor {
case class RemoveFromWatchlist(jobID: String, userID: String)

// JobActor is requested to Delete the job
case class Delete(jobID: String, userID: Option[String] = None)
case class Delete(jobID: String, userID: String)

// Delete multiple jobs at once
case class DeleteList(jobIDs: List[String])

// Job Controller receives a job state change from the SGE or from any other valid source
case class JobStateChanged(jobID: String, jobState: JobState)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class SubmissionController @Inject()(

def delete(jobID: String): Action[AnyContent] = userAction { implicit request =>
logger.info("Delete Action in JobController reached")
jobActorAccess.sendToJobActor(jobID, Delete(jobID, Some(request.user.userID)))
jobActorAccess.sendToJobActor(jobID, Delete(jobID, request.user.userID))
NoContent
}

Expand Down

0 comments on commit 9815fa9

Please sign in to comment.