-
Notifications
You must be signed in to change notification settings - Fork 3
#601 Acquire lock at the start of running a job instead of just on write. #602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes remove the Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler
participant TaskRunnerBase
participant LockFactory
participant Task
Scheduler->>TaskRunnerBase: runTask(task)
TaskRunnerBase->>TaskRunnerBase: doValidateOrSkipTask(task)
alt Task should be skipped
TaskRunnerBase-->>Scheduler: Return skipped status
else
TaskRunnerBase->>LockFactory: acquireLock(task)
alt Lock acquired
TaskRunnerBase->>Task: validateAndRun(task)
TaskRunnerBase->>LockFactory: releaseLock(task)
TaskRunnerBase-->>Scheduler: Return run status
else Lock not acquired
alt Skipping locked tasks allowed
TaskRunnerBase-->>Scheduler: Return skipped status
else
TaskRunnerBase-->>Scheduler: Throw exception
end
end
end
Assessment against linked issues
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
💤 Files with no reviewable changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (8)
🔇 Additional comments (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Unit Test Coverage
Files
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR centralizes lock management by acquiring the lock at the start of a task run instead of during the write phase, ensuring more reliable concurrent execution handling.
- Consolidates task validation and lock acquisition logic into doValidateOrSkipTask.
- Removes redundant lock acquisition in the run method and eliminates the custom AlreadyRunningSkipException.
- Introduces the use of TokenLockFactoryAllow for transient tasks.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala | Refactored task execution flow to centralize lock management and streamline task validation. |
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala | Removed obsolete exception in favor of unified error handling with standard exceptions. |
Comments suppressed due to low confidence (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala:154
- [nitpick] The public method is renamed to doValidateOrSkipTask while its private helper remains as doValidateAndRunTask, which may be confusing. Consider renaming the private method (e.g., to doRunTaskWithLock) for clarity.
protected def doValidateOrSkipTask(task: Task): RunStatus = {
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala:175
- With the removal of AlreadyRunningSkipException, verify that downstream exception handling is updated to properly handle IllegalStateException for concurrently running tasks.
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")
|
||
private def doValidateAndRunTask(task: Task, started: Instant): RunStatus = { | ||
val isTransient = task.job.outputTable.format.isTransient | ||
val taskLockFactory = if (isTransient) new TokenLockFactoryAllow else lockFactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] If TokenLockFactoryAllow is stateless, consider reusing an instance instead of creating a new one for each transient task to optimize resource usage.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Will do it in one of next PRs
Closes #601
Summary by CodeRabbit
Refactor
Bug Fixes