-
Notifications
You must be signed in to change notification settings - Fork 3
#598 Add '--skip-locked' option so that already running jobs are skipped instead of error thrown #600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduce a new configuration and command-line option, Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant CLI
participant Config
participant TaskRunner
participant LockManager
User->>CLI: Runs pipeline with --skip-locked
CLI->>Config: Sets skipLocked = true
TaskRunner->>LockManager: Try acquire lock
alt Lock acquired
LockManager-->>TaskRunner: Success
TaskRunner->>TaskRunner: Run job
else Lock not acquired & skipLocked=true
LockManager-->>TaskRunner: Failure
TaskRunner->>TaskRunner: Throw AlreadyRunningSkipException
TaskRunner->>TaskRunner: Mark job as Skipped
else Lock not acquired & skipLocked=false
LockManager-->>TaskRunner: Failure
TaskRunner->>TaskRunner: Throw error, mark job as Failed
end
Assessment against linked issues
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms (8)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
README.md (1)
3397-3397
: Clarify boolean flag usage for--use-lock
.Consider updating the Example cell to reflect that this option accepts both
true
andfalse
, e.g.:| --use-lock |
--use-lock <true|false>
| If true (default)… |This will make its usage more explicit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
README.md
(1 hunks)pramen/core/src/main/resources/reference.conf
(1 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
(4 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala
(3 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala
(1 hunks)pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
(2 hunks)pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala
(2 hunks)pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
- GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
- GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
- GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
- GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
- GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
- GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (15)
pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala (2)
31-31
: LGTM! Consistent parameter addition.The new
skipLocked
parameter follows the same pattern as other Boolean parameters in the method signature with a proper default value.
49-49
: LGTM! Proper parameter passing.The parameter is correctly passed through to the
RuntimeConfig
constructor, maintaining consistency with the method signature.pramen/core/src/main/resources/reference.conf (1)
96-98
: LGTM! Well-documented configuration property.The new
runtime.skip.locked
configuration property is:
- Properly documented with clear explanation of behavior
- Has a sensible default value of
false
(maintaining current behavior)- Follows the existing configuration structure and naming conventions
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala (1)
19-20
: LGTM! Well-designed exception class.The
AlreadyRunningSkipException
class is properly implemented:
- Extends
RuntimeException
which is appropriate for this use case- Has a clear, descriptive message
- Includes proper documentation explaining its purpose
- Simple and focused design for graceful job skipping
pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala (2)
292-299
: LGTM! Comprehensive positive test case.The test properly verifies that when
--skip-locked
is specified:
- The configuration contains the
SKIP_LOCKED
path- The value is correctly set to
true
- Follows the established testing pattern in the suite
301-306
: LGTM! Important negative test case.The test ensures that when
--skip-locked
is not specified, the configuration doesn't contain theSKIP_LOCKED
path, which verifies the optional nature of the flag and prevents false positives.README.md (1)
3397-3398
: Options documentation matches implementation.The
--use-lock
and--skip-locked
flags are correctly described and align with the new functionality introduced in the PR.pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala (3)
41-41
: LGTM: Consistent field addition.The
skipLocked
field follows the same pattern as other optional Boolean configuration fields in the case class.
114-116
: LGTM: Proper configuration application.The implementation correctly applies the command line value to the configuration using the same pattern as other boolean options.
213-216
: LGTM: Well-defined command line option.The option implementation is correct:
- Uses
Unit
type for flag-style behavior- Sets
skipLocked
toSome(true)
when present- Provides clear documentation about the feature's purpose
pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala (3)
36-36
: LGTM: Clear field documentation.The field is well-documented with a clear explanation of its purpose in the context of job locking behavior.
146-146
: LGTM: Proper config value reading.The implementation correctly reads the boolean configuration value using the standard
getBoolean
method.
169-169
: LGTM: Backward-compatible default value.Setting the default to
false
maintains backward compatibility, ensuring existing behavior is preserved when the flag is not explicitly set.pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (2)
450-460
: LGTM: Proper exception handling with appropriate status.The exception handling correctly:
- Specifically checks for
AlreadyRunningSkipException
instances- Returns
RunStatus.Skipped
with a clear message and warning flag- Preserves all necessary task result information including dependency warnings
- Maintains consistency with other exception handling patterns in the method
336-344
:✅ Verification successful
LGTM: Well-implemented conditional lock behavior.
The logic correctly implements the skip-locked feature:
- Only applies when locking is enabled (
runtimeConfig.useLocks
)- Checks if lock acquisition fails (
!lock.tryAcquire()
)- Conditionally throws the appropriate exception based on
skipLocked
flag- Maintains backward compatibility with existing error behavior
Verify that the
AlreadyRunningSkipException
class exists and is properly imported:
🏁 Script executed:
#!/bin/bash # Description: Verify AlreadyRunningSkipException class exists # Expected: Find the exception class definition and import statement # Search for the exception class definition fd -e scala AlreadyRunningSkipException --exec cat {} # Search for import statements in this file rg -A 2 -B 2 "AlreadyRunningSkipException" --type scalaLength of output: 2798
✅ Exception Verified: AlreadyRunningSkipException
- The
AlreadyRunningSkipException
class is defined in
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala
within the same package asTaskRunnerBase.scala
, so no import is required.- Lock‐acquisition logic and skip‐locked behavior remain correctly implemented.
LGTM.
@@ -59,6 +60,7 @@ object RuntimeConfig { | |||
val RUN_TABLES = "pramen.runtime.run.tables" | |||
val UNDERCOVER = "pramen.undercover" | |||
val USE_LOCK = "pramen.use.lock" | |||
val SKIP_LOCKED = "pramen.runtime.skip.locked" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Verify configuration key consistency.
The configuration key looks correct, but ensure it matches the key defined in reference.conf
.
🏁 Script executed:
#!/bin/bash
# Description: Verify the configuration key exists in reference.conf
# Expected: Find the pramen.runtime.skip.locked key with default value false
fd -e conf reference.conf --exec grep -n "pramen.runtime.skip.locked"
Length of output: 69
🏁 Script executed:
#!/bin/bash
# Locate reference.conf in the repository
file=$(fd -t f -I reference.conf)
echo "Found reference.conf: ${file:-<none>}"
if [[ -n "$file" ]]; then
echo "Searching for pramen.runtime.skip.locked in $file"
grep -n "pramen.runtime.skip.locked" "$file" || echo "Key not found in $file"
else
echo "Error: reference.conf not found"
fi
Length of output: 1713
🏁 Script executed:
#!/bin/bash
echo "Locating all reference.conf files..."
files=$(fd -t f -I reference.conf)
if [[ -z "$files" ]]; then
echo "Error: No reference.conf files found."
exit 1
fi
for f in $files; do
echo "Checking $f"
grep -Hn "pramen.runtime.skip.locked" "$f" || echo "Key 'pramen.runtime.skip.locked' not found in $f"
done
Length of output: 1682
Missing configuration key in reference.conf
The constant SKIP_LOCKED = "pramen.runtime.skip.locked"
isn’t defined in any of your reference.conf
files. Please add this key (with a default value) so that the setting is recognized at runtime.
Locations to update:
- pramen/core/src/main/resources/reference.conf
- (optionally) pramen/core/src/test/resources/reference.conf if tests load this file
- pramen/extras/src/main/resources/reference.conf if the “extras” module uses it
Suggested diff for pramen/core/src/main/resources/reference.conf
:
@@ # Runtime settings
-# (other pramen.runtime.* entries)
+pramen.runtime.skip.locked = false # Skip locked resources during runtime
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
at line 63, the configuration key SKIP_LOCKED is defined as
"pramen.runtime.skip.locked" but this key is missing from all reference.conf
files. To fix this, add the key "pramen.runtime.skip.locked" with a default
value (e.g., false) to pramen/core/src/main/resources/reference.conf, and
optionally to pramen/core/src/test/resources/reference.conf and
pramen/extras/src/main/resources/reference.conf if those modules use it,
ensuring the runtime configuration recognizes this setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default configuration is there in reference.conf. The line is 'runtime.skip.locked = false'. It is under 'pramen { }' section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for the nested configuration structure in reference.conf files
echo "Searching for runtime.skip.locked in pramen section..."
files=$(fd -t f -I reference.conf)
for f in $files; do
echo "=== Checking $f ==="
# Look for the nested structure - search for 'runtime.skip.locked'
grep -A5 -B5 "runtime\.skip\.locked" "$f" || echo "Pattern 'runtime.skip.locked' not found"
# Also search for just 'skip.locked' to catch variations
grep -A5 -B5 "skip\.locked" "$f" || echo "Pattern 'skip.locked' not found"
# Show the pramen section context
echo "--- Showing pramen section context ---"
sed -n '/^pramen {/,/^}/p' "$f" | head -20
done
Length of output: 3819
@yruslan You're absolutely right, and I apologize for the confusion! The configuration key runtime.skip.locked = false
is indeed present in pramen/core/src/main/resources/reference.conf
under the pramen { }
section, exactly as you stated.
I can see from the search results that it's properly configured:
pramen {
# ...
use.lock = true
# If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
runtime.skip.locked = false
# ...
}
My initial analysis was incorrect - I was searching for the fully qualified dotted notation "pramen.runtime.skip.locked"
but HOCON configuration files use nested structures. When nested under the pramen { }
section as runtime.skip.locked = false
, it's correctly accessible via the flat key "pramen.runtime.skip.locked"
that your RuntimeConfig.fromConfig
method uses.
Thank you for the correction! The configuration setup is working as intended.
🐰
(\ /)
( ..)
o(")(") learns something new
🧬 Code Graph Analysis Results
Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
(lines 1-153)
This file defines the RuntimeConfig
case class and its companion object which handles loading runtime configuration from a Typesafe Config
object.
-
The
RuntimeConfig
case class contains many fields controlling runtime behavior, including:skipLocked: Boolean
— whether to skip jobs that are already running and holding a lock.useLocks: Boolean
— whether to acquire locks before writing.- Other flags like
isDryRun
,isRerun
,isUndercover
,checkOnlyLateData
,checkOnlyNewData
, etc. runDate
andrunDateTo
for specifying the date range.parallelTasks
,stopSparkSession
,allowEmptyPipeline
,alwaysAddBatchIdColumn
,historicalRunMode
, andsparkAppDescriptionTemplate
.
-
The companion object defines string constants for config keys, including:
SKIP_LOCKED = "pramen.runtime.skip.locked"
USE_LOCK = "pramen.use.lock"
- Other keys like
DRY_RUN
,IS_RERUN
,RUN_TABLES
, etc.
-
The
fromConfig(conf: Config): RuntimeConfig
method reads the configuration from the providedConfig
object:- Reads
skipLocked
usingconf.getBoolean(SKIP_LOCKED)
. - Reads
useLocks
usingconf.getBoolean(USE_LOCK)
. - Reads other flags similarly.
- Uses helper methods from
ConfigUtils
to read optional values. - Converts date strings to
LocalDate
usingconvertStrToDate
. - Throws an exception if
parallelTasks
is zero or negative. - Returns a fully populated
RuntimeConfig
instance.
- Reads
-
The
default
method returns aRuntimeConfig
instance with hardcoded default values, including:skipLocked = false
useLocks = true
isDryRun = false
isRerun = false
parallelTasks = 1
stopSparkSession = true
historicalRunMode = RunMode.CheckUpdates
- and others.
This file is the primary place where the runtime configuration is loaded and interpreted, including the skipLocked
flag mentioned in the user comment.
Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ConfigUtils.scala
(lines 30-80 approx)
This utility object provides helper methods to safely read optional configuration values from a Typesafe Config
object:
getOptionBoolean(conf: Config, path: String): Option[Boolean]
— returnsSome(boolean)
if the path exists, elseNone
.getOptionString(conf: Config, path: String): Option[String]
— returnsSome(string)
if the path exists, elseNone
.getOptListStrings(conf: Config, path: String): Seq[String]
— returns a list of strings if the path exists, else empty list.
These methods are used in RuntimeConfig.fromConfig
to read optional config keys safely.
Relevant snippet from pramen/api/src/main/scala/za/co/absa/pramen/api/RunMode.scala
(lines 20-45)
Defines the RunMode
sealed trait and its three case objects:
SkipAlreadyRan
("fill_gaps")CheckUpdates
("check_updates")ForceRun
("force")
Includes a method fromString
to parse a string into a RunMode
.
This is relevant because RuntimeConfig
uses RunMode
for the historicalRunMode
field and parses it from config key "pramen.runtime.run.mode"
.
Summary
The key relevant information for the user comment about the default configuration line runtime.skip.locked = false
under the pramen { }
section is:
- The
RuntimeConfig
object reads theskipLocked
flag from the config key"pramen.runtime.skip.locked"
. - The default value for
skipLocked
in thedefault
method isfalse
. - The config key is expected under the
pramen
namespace, consistent with the user comment. - The
fromConfig
method reads this flag directly from the providedConfig
object, which would typically be loaded fromreference.conf
or other config files.
[no additional relevant snippets found]
Unit Test Coverage
Files
|
Closes #598
Summary by CodeRabbit