Skip to content

#598 Add '--skip-locked' option so that already running jobs are skipped instead of error thrown #600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2025

Conversation

yruslan
Copy link
Collaborator

@yruslan yruslan commented May 28, 2025

Closes #598

Summary by CodeRabbit

  • New Features
    • Added a command-line option to skip jobs that are already running and holding a lock, enabling smoother concurrent job execution.
    • Introduced a configuration property to control skipping of locked jobs.
  • Documentation
    • Updated documentation to describe new execution options for managing job locking and skipping behavior.
  • Tests
    • Added tests to verify the new command-line option for skipping locked jobs.

Copy link

coderabbitai bot commented May 28, 2025

Walkthrough

The changes introduce a new configuration and command-line option, --skip-locked, to the Pramen pipeline runner. When enabled, this feature allows jobs that cannot acquire a lock (because another process holds it) to be skipped rather than treated as errors. The implementation includes documentation updates, configuration changes, logic modifications to skip locked jobs, a new exception type, and corresponding tests.

Changes

File(s) Change Summary
README.md Documented new --use-lock and --skip-locked command-line options.
.../reference.conf Added pramen.runtime.skip.locked boolean config property (default false).
.../app/config/RuntimeConfig.scala, .../RuntimeConfigFactory.scala Added skipLocked field to RuntimeConfig and test factory; updated config parsing and defaults.
.../cmd/CmdLineConfig.scala, .../cmd/CmdLineConfigSuite.scala Added --skip-locked CLI option, config propagation, and test coverage for the new flag.
.../runner/task/AlreadyRunningSkipException.scala Introduced new exception for skipping jobs already running.
.../runner/task/TaskRunnerBase.scala Modified task runner to skip jobs if skipLocked is enabled and lock cannot be acquired.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant CLI
    participant Config
    participant TaskRunner
    participant LockManager

    User->>CLI: Runs pipeline with --skip-locked
    CLI->>Config: Sets skipLocked = true
    TaskRunner->>LockManager: Try acquire lock
    alt Lock acquired
        LockManager-->>TaskRunner: Success
        TaskRunner->>TaskRunner: Run job
    else Lock not acquired & skipLocked=true
        LockManager-->>TaskRunner: Failure
        TaskRunner->>TaskRunner: Throw AlreadyRunningSkipException
        TaskRunner->>TaskRunner: Mark job as Skipped
    else Lock not acquired & skipLocked=false
        LockManager-->>TaskRunner: Failure
        TaskRunner->>TaskRunner: Throw error, mark job as Failed
    end
Loading

Assessment against linked issues

Objective (issue #) Addressed Explanation
Add option to skip jobs holding locks (--skip-locked) (#598)
Jobs with lock held are skipped (not error) when option is enabled (#598)
Document and test the new skip-locked behavior (#598)

Poem

In the meadow where pipelines hop,
A lock might block, but we don’t stop!
With --skip-locked we leap ahead,
Skipping busy jobs instead.
Now tasks can run, both big and small,
No more waiting — we skip them all!
🐇✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 036e11b and 5547055.

📒 Files selected for processing (1)
  • README.md (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • README.md
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary or {PR Summary} to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
README.md (1)

3397-3397: Clarify boolean flag usage for --use-lock.

Consider updating the Example cell to reflect that this option accepts both true and false, e.g.:

| --use-lock | --use-lock <true|false> | If true (default)… |

This will make its usage more explicit.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 085de96 and 036e11b.

📒 Files selected for processing (8)
  • README.md (1 hunks)
  • pramen/core/src/main/resources/reference.conf (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala (4 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala (3 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (15)
pramen/core/src/test/scala/za/co/absa/pramen/core/RuntimeConfigFactory.scala (2)

31-31: LGTM! Consistent parameter addition.

The new skipLocked parameter follows the same pattern as other Boolean parameters in the method signature with a proper default value.


49-49: LGTM! Proper parameter passing.

The parameter is correctly passed through to the RuntimeConfig constructor, maintaining consistency with the method signature.

pramen/core/src/main/resources/reference.conf (1)

96-98: LGTM! Well-documented configuration property.

The new runtime.skip.locked configuration property is:

  • Properly documented with clear explanation of behavior
  • Has a sensible default value of false (maintaining current behavior)
  • Follows the existing configuration structure and naming conventions
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala (1)

19-20: LGTM! Well-designed exception class.

The AlreadyRunningSkipException class is properly implemented:

  • Extends RuntimeException which is appropriate for this use case
  • Has a clear, descriptive message
  • Includes proper documentation explaining its purpose
  • Simple and focused design for graceful job skipping
pramen/core/src/test/scala/za/co/absa/pramen/core/cmd/CmdLineConfigSuite.scala (2)

292-299: LGTM! Comprehensive positive test case.

The test properly verifies that when --skip-locked is specified:

  • The configuration contains the SKIP_LOCKED path
  • The value is correctly set to true
  • Follows the established testing pattern in the suite

301-306: LGTM! Important negative test case.

The test ensures that when --skip-locked is not specified, the configuration doesn't contain the SKIP_LOCKED path, which verifies the optional nature of the flag and prevents false positives.

README.md (1)

3397-3398: Options documentation matches implementation.

The --use-lock and --skip-locked flags are correctly described and align with the new functionality introduced in the PR.

pramen/core/src/main/scala/za/co/absa/pramen/core/cmd/CmdLineConfig.scala (3)

41-41: LGTM: Consistent field addition.

The skipLocked field follows the same pattern as other optional Boolean configuration fields in the case class.


114-116: LGTM: Proper configuration application.

The implementation correctly applies the command line value to the configuration using the same pattern as other boolean options.


213-216: LGTM: Well-defined command line option.

The option implementation is correct:

  • Uses Unit type for flag-style behavior
  • Sets skipLocked to Some(true) when present
  • Provides clear documentation about the feature's purpose
pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala (3)

36-36: LGTM: Clear field documentation.

The field is well-documented with a clear explanation of its purpose in the context of job locking behavior.


146-146: LGTM: Proper config value reading.

The implementation correctly reads the boolean configuration value using the standard getBoolean method.


169-169: LGTM: Backward-compatible default value.

Setting the default to false maintains backward compatibility, ensuring existing behavior is preserved when the flag is not explicitly set.

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (2)

450-460: LGTM: Proper exception handling with appropriate status.

The exception handling correctly:

  • Specifically checks for AlreadyRunningSkipException instances
  • Returns RunStatus.Skipped with a clear message and warning flag
  • Preserves all necessary task result information including dependency warnings
  • Maintains consistency with other exception handling patterns in the method

336-344:

✅ Verification successful

LGTM: Well-implemented conditional lock behavior.

The logic correctly implements the skip-locked feature:

  • Only applies when locking is enabled (runtimeConfig.useLocks)
  • Checks if lock acquisition fails (!lock.tryAcquire())
  • Conditionally throws the appropriate exception based on skipLocked flag
  • Maintains backward compatibility with existing error behavior

Verify that the AlreadyRunningSkipException class exists and is properly imported:


🏁 Script executed:

#!/bin/bash
# Description: Verify AlreadyRunningSkipException class exists
# Expected: Find the exception class definition and import statement

# Search for the exception class definition
fd -e scala AlreadyRunningSkipException --exec cat {}

# Search for import statements in this file
rg -A 2 -B 2 "AlreadyRunningSkipException" --type scala

Length of output: 2798


✅ Exception Verified: AlreadyRunningSkipException

  • The AlreadyRunningSkipException class is defined in
    pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/AlreadyRunningSkipException.scala
    within the same package as TaskRunnerBase.scala, so no import is required.
  • Lock‐acquisition logic and skip‐locked behavior remain correctly implemented.

LGTM.

@@ -59,6 +60,7 @@ object RuntimeConfig {
val RUN_TABLES = "pramen.runtime.run.tables"
val UNDERCOVER = "pramen.undercover"
val USE_LOCK = "pramen.use.lock"
val SKIP_LOCKED = "pramen.runtime.skip.locked"
Copy link

@coderabbitai coderabbitai bot May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify configuration key consistency.

The configuration key looks correct, but ensure it matches the key defined in reference.conf.


🏁 Script executed:

#!/bin/bash
# Description: Verify the configuration key exists in reference.conf
# Expected: Find the pramen.runtime.skip.locked key with default value false

fd -e conf reference.conf --exec grep -n "pramen.runtime.skip.locked"

Length of output: 69


🏁 Script executed:

#!/bin/bash
# Locate reference.conf in the repository
file=$(fd -t f -I reference.conf)
echo "Found reference.conf: ${file:-<none>}"

if [[ -n "$file" ]]; then
  echo "Searching for pramen.runtime.skip.locked in $file"
  grep -n "pramen.runtime.skip.locked" "$file" || echo "Key not found in $file"
else
  echo "Error: reference.conf not found"
fi

Length of output: 1713


🏁 Script executed:

#!/bin/bash
echo "Locating all reference.conf files..."
files=$(fd -t f -I reference.conf)

if [[ -z "$files" ]]; then
  echo "Error: No reference.conf files found."
  exit 1
fi

for f in $files; do
  echo "Checking $f"
  grep -Hn "pramen.runtime.skip.locked" "$f" || echo "Key 'pramen.runtime.skip.locked' not found in $f"
done

Length of output: 1682


Missing configuration key in reference.conf

The constant SKIP_LOCKED = "pramen.runtime.skip.locked" isn’t defined in any of your reference.conf files. Please add this key (with a default value) so that the setting is recognized at runtime.

Locations to update:

  • pramen/core/src/main/resources/reference.conf
  • (optionally) pramen/core/src/test/resources/reference.conf if tests load this file
  • pramen/extras/src/main/resources/reference.conf if the “extras” module uses it

Suggested diff for pramen/core/src/main/resources/reference.conf:

@@ # Runtime settings
-​# (other pramen.runtime.* entries)
+pramen.runtime.skip.locked = false  # Skip locked resources during runtime

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
at line 63, the configuration key SKIP_LOCKED is defined as
"pramen.runtime.skip.locked" but this key is missing from all reference.conf
files. To fix this, add the key "pramen.runtime.skip.locked" with a default
value (e.g., false) to pramen/core/src/main/resources/reference.conf, and
optionally to pramen/core/src/test/resources/reference.conf and
pramen/extras/src/main/resources/reference.conf if those modules use it,
ensuring the runtime configuration recognizes this setting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default configuration is there in reference.conf. The line is 'runtime.skip.locked = false'. It is under 'pramen { }' section.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the nested configuration structure in reference.conf files
echo "Searching for runtime.skip.locked in pramen section..."

files=$(fd -t f -I reference.conf)

for f in $files; do
  echo "=== Checking $f ==="
  
  # Look for the nested structure - search for 'runtime.skip.locked' 
  grep -A5 -B5 "runtime\.skip\.locked" "$f" || echo "Pattern 'runtime.skip.locked' not found"
  
  # Also search for just 'skip.locked' to catch variations
  grep -A5 -B5 "skip\.locked" "$f" || echo "Pattern 'skip.locked' not found"
  
  # Show the pramen section context
  echo "--- Showing pramen section context ---"
  sed -n '/^pramen {/,/^}/p' "$f" | head -20
  
done

Length of output: 3819


@yruslan You're absolutely right, and I apologize for the confusion! The configuration key runtime.skip.locked = false is indeed present in pramen/core/src/main/resources/reference.conf under the pramen { } section, exactly as you stated.

I can see from the search results that it's properly configured:

pramen {
  # ...
  use.lock = true
  
  # If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
  runtime.skip.locked = false
  # ...
}

My initial analysis was incorrect - I was searching for the fully qualified dotted notation "pramen.runtime.skip.locked" but HOCON configuration files use nested structures. When nested under the pramen { } section as runtime.skip.locked = false, it's correctly accessible via the flat key "pramen.runtime.skip.locked" that your RuntimeConfig.fromConfig method uses.

Thank you for the correction! The configuration setup is working as intended.

🐰
(\ /)
( ..)
o
(")(") learns something new


🧬 Code Graph Analysis Results

Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala (lines 1-153)

This file defines the RuntimeConfig case class and its companion object which handles loading runtime configuration from a Typesafe Config object.

  • The RuntimeConfig case class contains many fields controlling runtime behavior, including:

    • skipLocked: Boolean — whether to skip jobs that are already running and holding a lock.
    • useLocks: Boolean — whether to acquire locks before writing.
    • Other flags like isDryRun, isRerun, isUndercover, checkOnlyLateData, checkOnlyNewData, etc.
    • runDate and runDateTo for specifying the date range.
    • parallelTasks, stopSparkSession, allowEmptyPipeline, alwaysAddBatchIdColumn, historicalRunMode, and sparkAppDescriptionTemplate.
  • The companion object defines string constants for config keys, including:

    • SKIP_LOCKED = "pramen.runtime.skip.locked"
    • USE_LOCK = "pramen.use.lock"
    • Other keys like DRY_RUN, IS_RERUN, RUN_TABLES, etc.
  • The fromConfig(conf: Config): RuntimeConfig method reads the configuration from the provided Config object:

    • Reads skipLocked using conf.getBoolean(SKIP_LOCKED).
    • Reads useLocks using conf.getBoolean(USE_LOCK).
    • Reads other flags similarly.
    • Uses helper methods from ConfigUtils to read optional values.
    • Converts date strings to LocalDate using convertStrToDate.
    • Throws an exception if parallelTasks is zero or negative.
    • Returns a fully populated RuntimeConfig instance.
  • The default method returns a RuntimeConfig instance with hardcoded default values, including:

    • skipLocked = false
    • useLocks = true
    • isDryRun = false
    • isRerun = false
    • parallelTasks = 1
    • stopSparkSession = true
    • historicalRunMode = RunMode.CheckUpdates
    • and others.

This file is the primary place where the runtime configuration is loaded and interpreted, including the skipLocked flag mentioned in the user comment.


Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ConfigUtils.scala (lines 30-80 approx)

This utility object provides helper methods to safely read optional configuration values from a Typesafe Config object:

  • getOptionBoolean(conf: Config, path: String): Option[Boolean] — returns Some(boolean) if the path exists, else None.
  • getOptionString(conf: Config, path: String): Option[String] — returns Some(string) if the path exists, else None.
  • getOptListStrings(conf: Config, path: String): Seq[String] — returns a list of strings if the path exists, else empty list.

These methods are used in RuntimeConfig.fromConfig to read optional config keys safely.


Relevant snippet from pramen/api/src/main/scala/za/co/absa/pramen/api/RunMode.scala (lines 20-45)

Defines the RunMode sealed trait and its three case objects:

  • SkipAlreadyRan ("fill_gaps")
  • CheckUpdates ("check_updates")
  • ForceRun ("force")

Includes a method fromString to parse a string into a RunMode.

This is relevant because RuntimeConfig uses RunMode for the historicalRunMode field and parses it from config key "pramen.runtime.run.mode".


Summary

The key relevant information for the user comment about the default configuration line runtime.skip.locked = false under the pramen { } section is:

  • The RuntimeConfig object reads the skipLocked flag from the config key "pramen.runtime.skip.locked".
  • The default value for skipLocked in the default method is false.
  • The config key is expected under the pramen namespace, consistent with the user comment.
  • The fromConfig method reads this flag directly from the provided Config object, which would typically be loaded from reference.conf or other config files.

[no additional relevant snippets found]

Copy link

github-actions bot commented May 28, 2025

Unit Test Coverage

Overall Project 84.92% -0.04% 🍏
Files changed 63.37%

Module Coverage
pramen:core Jacoco Report 85.8% -0.05%
Files
Module File Coverage
pramen:core Jacoco Report CmdLineConfig.scala 95.13% -0.37% 🍏
RuntimeConfig.scala 91.48% 🍏
TaskRunnerBase.scala 84.56% -1.25%
AlreadyRunningSkipException.scala 0%

@yruslan yruslan merged commit 7b9d276 into main May 28, 2025
9 checks passed
@yruslan yruslan deleted the feature/598-skip-locked-jobs branch May 28, 2025 08:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow skipping jobs that hold locks to output tables
1 participant