Skip to content

#601 Acquire lock at the start of running a job instead of just on write. #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException}
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.journal.model.TaskCompleted
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.lock.{TokenLockFactory, TokenLockFactoryAllow}
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
Expand Down Expand Up @@ -136,7 +136,7 @@ abstract class TaskRunnerBase(conf: Config,
try {
ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) {
log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.")
runStatus = doValidateAndRunTask(task)
runStatus = doValidateOrSkipTask(task)
}
runStatus
} catch {
Expand All @@ -145,24 +145,44 @@ abstract class TaskRunnerBase(conf: Config,
}
case Some(timeout) =>
log.error(s"Incorrect timeout for the task: ${task.job.name}. Should be bigger than zero, got: $timeout.")
doValidateAndRunTask(task)
doValidateOrSkipTask(task)
case None =>
doValidateAndRunTask(task)
doValidateOrSkipTask(task)
}
}

protected def doValidateAndRunTask(task: Task): RunStatus = {
protected def doValidateOrSkipTask(task: Task): RunStatus = {
val started = Instant.now()

task.reason match {
case TaskRunReason.Skip(reason) =>
// This skips tasks that were skipped based on strong date constraints (e.g. attempt to run before the minimum date)
skipTask(task, reason, isWarning = true)
case _ =>
val result: TaskResult = validate(task, started) match {
case Left(failedResult) => failedResult
case Right(validationResult) => run(task, started, validationResult)
case _ => doValidateAndRunTask(task, started)
}
}

private def doValidateAndRunTask(task: Task, started: Instant): RunStatus = {
val isTransient = task.job.outputTable.format.isTransient
val taskLockFactory = if (isTransient) new TokenLockFactoryAllow else lockFactory
Copy link
Preview

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] If TokenLockFactoryAllow is stateless, consider reusing an instance instead of creating a new one for each transient task to optimize resource usage.

Copilot uses AI. Check for mistakes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Will do it in one of next PRs

val lock = taskLockFactory.getLock(getTokenName(task))

try {
if (!lock.tryAcquire()) {
if (runtimeConfig.skipLocked) {
return skipTask(task, "Another instance is already running", isWarning = true)
} else {
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")
}
onTaskCompletion(task, result, isLazy = false)
}

val result: TaskResult = validate(task, started) match {
case Left(failedResult) => failedResult
case Right(validationResult) => run(task, started, validationResult)
}
onTaskCompletion(task, result, isLazy = false)
} finally {
lock.release()
}
}

Expand Down Expand Up @@ -329,20 +349,9 @@ abstract class TaskRunnerBase(conf: Config,
private[core] def run(task: Task, started: Instant, validationResult: JobPreRunResult): TaskResult = {
val isTransient = task.job.outputTable.format.isTransient
val isRawFileBased = task.job.outputTable.format.isRaw
val lock = lockFactory.getLock(getTokenName(task))

val attempt = try {
Try {
if (!isTransient && runtimeConfig.useLocks) {
if (!lock.tryAcquire()) {
if (runtimeConfig.skipLocked) {
throw new AlreadyRunningSkipException()
} else {
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")
}
}
}

val recordCountOldOpt = bookkeeper.getLatestDataChunk(task.job.outputTable.name, task.infoDate, task.infoDate).map(_.outputRecordCount)

val runResult = task.job.run(task.infoDate, task.reason, conf)
Expand Down Expand Up @@ -440,24 +449,12 @@ abstract class TaskRunnerBase(conf: Config,
} finally {
if (!isTransient) {
task.job.metastore.rollbackIncrementalTables()
lock.release()
}
}

attempt match {
case Success(result) =>
result
case Failure(ex) if ex.isInstanceOf[AlreadyRunningSkipException] =>
TaskResult(task.job.taskDef,
RunStatus.Skipped("Another instance is already running", isWarning = true),
getRunInfo(task.infoDate, started),
applicationId,
isTransient,
isRawFileBased,
Nil,
validationResult.dependencyWarnings,
Seq.empty,
task.job.operation.extraOptions)
case Failure(ex) =>
TaskResult(task.job.taskDef,
RunStatus.Failed(ex),
Expand Down