Skip to content

#598 Add '--skip-locked' option so that already running jobs are skipped instead of error thrown #600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3381,19 +3381,21 @@ Specify which dates to run:
| --date <yyyy-MM-dd> | `--date 2020-07-18` | Runs the pipeline as if the current date was the one specified. |
| --rerun <yyyy-MM-dd> | `--rerun 2020-07-18` | Re-runs the pipeline for the specific date. |
| --date-from <yyyy-MM-dd> --date-to <yyyy-MM-dd> | `--date-from 2020-07-01 --date-to 2020-07-18` | Runs the pipeline for a historical date range. Usually only subset of operations are selected for a historical run. |
| --run-mode { fill_gaps / check_updates / force } | `--run-mode fill_gaps` | Specifies the more of processing historical data range. `fill_gaps` runs only jobs that haven't ran before, `check_updates` runs jobs only if there are updates, `force` always runs each date in the range. |
| --inverse-order <true/false> | `--inverse-order true` | By default jobs are executed from the oldest to newest, this option allows reversing the order. For long historical jobs this can help make newest data available as soon as possible. |
| --run-mode <fill_gaps \| check_updates \| force> | `--run-mode fill_gaps` | Specifies the more of processing historical data range. `fill_gaps` runs only jobs that haven't ran before, `check_updates` runs jobs only if there are updates, `force` always runs each date in the range. |
| --inverse-order <true \| false> | `--inverse-order true` | By default jobs are executed from the oldest to newest, this option allows reversing the order. For long historical jobs this can help make newest data available as soon as possible. |

Execution options:

| Argument | Example | Description |
|----------------------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| --dry-run | `--dry-run` | If specified the pipeline won't be executed. Pramen will print which job it would have run. |
| --verbose | `--verbose` | If specified, application logs will include more Spark execution details. |
| --override-log-level | `--override-log-level INFO` | Overrides environment configured root log level. |
| --check-late-only | `--check-late-only` | If specified, Pramen will do only late data checks and checks for retrospective updates. It won't run jobs that are not yet late. Useful for catch-up job schedules. |
| --check-new-only | `--check-new-only` | If specified, Pramen will not check for late and updated data and will run only jobs scheduled for the current date. |
| --undercover | `--undercover` | If specified, Pramen will not update bookkeeper so any changes caused by the pipeline won't be recorded. Useful for re-running historical transformations without triggering execution of the rest of the pipeline. |
| Argument | Example | Description |
|----------------------------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| --dry-run | `--dry-run` | If specified the pipeline won't be executed. Pramen will print which job it would have run. |
| --verbose | `--verbose` | If specified, application logs will include more Spark execution details. |
| --override-log-level | `--override-log-level INFO` | Overrides environment configured root log level. |
| --check-late-only | `--check-late-only` | If specified, Pramen will do only late data checks and checks for retrospective updates. It won't run jobs that are not yet late. Useful for catch-up job schedules. |
| --check-new-only | `--check-new-only` | If specified, Pramen will not check for late and updated data and will run only jobs scheduled for the current date. |
| --undercover | `--undercover` | If specified, Pramen will not update bookkeeper so any changes caused by the pipeline won't be recorded. Useful for re-running historical transformations without triggering execution of the rest of the pipeline. |
| --use-lock <true \| false> | `--use-lock true` | If true (default) a lock will be used to protect against parallel writes to the same partition. Bookkeeping storage or database will be used for locking across pipelines. |
| --skip-locked | `--skip-locked` | If specified, jobs that are already running (holding a lock) will be skipped. Otherwise, an error will be thrown. |

### Command line examples

Expand Down
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ pramen {

use.lock = true

# If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
runtime.skip.locked = false

# Send an email even if there are no changes and no late or not ready data
email.if.no.changes = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ case class RuntimeConfig(
runTables: Seq[String], // Specifies which operations (identified by output table names) to run
isUndercover: Boolean, // If true, no bookkeeping will be done for the job
useLocks: Boolean, // If true, the pipeline will acquire locks before writing to a metastore table
skipLocked: Boolean, // If true, jobs that are already running and holding a lock will be skipped instead of throwing an error
checkOnlyLateData: Boolean,
checkOnlyNewData: Boolean,
emailIfNoChanges: Boolean,
Expand All @@ -59,6 +60,7 @@ object RuntimeConfig {
val RUN_TABLES = "pramen.runtime.run.tables"
val UNDERCOVER = "pramen.undercover"
val USE_LOCK = "pramen.use.lock"
val SKIP_LOCKED = "pramen.runtime.skip.locked"
Copy link

@coderabbitai coderabbitai bot May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify configuration key consistency.

The configuration key looks correct, but ensure it matches the key defined in reference.conf.


🏁 Script executed:

#!/bin/bash
# Description: Verify the configuration key exists in reference.conf
# Expected: Find the pramen.runtime.skip.locked key with default value false

fd -e conf reference.conf --exec grep -n "pramen.runtime.skip.locked"

Length of output: 69


🏁 Script executed:

#!/bin/bash
# Locate reference.conf in the repository
file=$(fd -t f -I reference.conf)
echo "Found reference.conf: ${file:-<none>}"

if [[ -n "$file" ]]; then
  echo "Searching for pramen.runtime.skip.locked in $file"
  grep -n "pramen.runtime.skip.locked" "$file" || echo "Key not found in $file"
else
  echo "Error: reference.conf not found"
fi

Length of output: 1713


🏁 Script executed:

#!/bin/bash
echo "Locating all reference.conf files..."
files=$(fd -t f -I reference.conf)

if [[ -z "$files" ]]; then
  echo "Error: No reference.conf files found."
  exit 1
fi

for f in $files; do
  echo "Checking $f"
  grep -Hn "pramen.runtime.skip.locked" "$f" || echo "Key 'pramen.runtime.skip.locked' not found in $f"
done

Length of output: 1682


Missing configuration key in reference.conf

The constant SKIP_LOCKED = "pramen.runtime.skip.locked" isn’t defined in any of your reference.conf files. Please add this key (with a default value) so that the setting is recognized at runtime.

Locations to update:

  • pramen/core/src/main/resources/reference.conf
  • (optionally) pramen/core/src/test/resources/reference.conf if tests load this file
  • pramen/extras/src/main/resources/reference.conf if the “extras” module uses it

Suggested diff for pramen/core/src/main/resources/reference.conf:

@@ # Runtime settings
-​# (other pramen.runtime.* entries)
+pramen.runtime.skip.locked = false  # Skip locked resources during runtime

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In
pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala
at line 63, the configuration key SKIP_LOCKED is defined as
"pramen.runtime.skip.locked" but this key is missing from all reference.conf
files. To fix this, add the key "pramen.runtime.skip.locked" with a default
value (e.g., false) to pramen/core/src/main/resources/reference.conf, and
optionally to pramen/core/src/test/resources/reference.conf and
pramen/extras/src/main/resources/reference.conf if those modules use it,
ensuring the runtime configuration recognizes this setting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default configuration is there in reference.conf. The line is 'runtime.skip.locked = false'. It is under 'pramen { }' section.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the nested configuration structure in reference.conf files
echo "Searching for runtime.skip.locked in pramen section..."

files=$(fd -t f -I reference.conf)

for f in $files; do
  echo "=== Checking $f ==="
  
  # Look for the nested structure - search for 'runtime.skip.locked' 
  grep -A5 -B5 "runtime\.skip\.locked" "$f" || echo "Pattern 'runtime.skip.locked' not found"
  
  # Also search for just 'skip.locked' to catch variations
  grep -A5 -B5 "skip\.locked" "$f" || echo "Pattern 'skip.locked' not found"
  
  # Show the pramen section context
  echo "--- Showing pramen section context ---"
  sed -n '/^pramen {/,/^}/p' "$f" | head -20
  
done

Length of output: 3819


@yruslan You're absolutely right, and I apologize for the confusion! The configuration key runtime.skip.locked = false is indeed present in pramen/core/src/main/resources/reference.conf under the pramen { } section, exactly as you stated.

I can see from the search results that it's properly configured:

pramen {
  # ...
  use.lock = true
  
  # If true, jobs that are already running and hold a lock will be skipped instead of throwing an error
  runtime.skip.locked = false
  # ...
}

My initial analysis was incorrect - I was searching for the fully qualified dotted notation "pramen.runtime.skip.locked" but HOCON configuration files use nested structures. When nested under the pramen { } section as runtime.skip.locked = false, it's correctly accessible via the flat key "pramen.runtime.skip.locked" that your RuntimeConfig.fromConfig method uses.

Thank you for the correction! The configuration setup is working as intended.

🐰
(\ /)
( ..)
o
(")(") learns something new


🧬 Code Graph Analysis Results

Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/app/config/RuntimeConfig.scala (lines 1-153)

This file defines the RuntimeConfig case class and its companion object which handles loading runtime configuration from a Typesafe Config object.

  • The RuntimeConfig case class contains many fields controlling runtime behavior, including:

    • skipLocked: Boolean — whether to skip jobs that are already running and holding a lock.
    • useLocks: Boolean — whether to acquire locks before writing.
    • Other flags like isDryRun, isRerun, isUndercover, checkOnlyLateData, checkOnlyNewData, etc.
    • runDate and runDateTo for specifying the date range.
    • parallelTasks, stopSparkSession, allowEmptyPipeline, alwaysAddBatchIdColumn, historicalRunMode, and sparkAppDescriptionTemplate.
  • The companion object defines string constants for config keys, including:

    • SKIP_LOCKED = "pramen.runtime.skip.locked"
    • USE_LOCK = "pramen.use.lock"
    • Other keys like DRY_RUN, IS_RERUN, RUN_TABLES, etc.
  • The fromConfig(conf: Config): RuntimeConfig method reads the configuration from the provided Config object:

    • Reads skipLocked using conf.getBoolean(SKIP_LOCKED).
    • Reads useLocks using conf.getBoolean(USE_LOCK).
    • Reads other flags similarly.
    • Uses helper methods from ConfigUtils to read optional values.
    • Converts date strings to LocalDate using convertStrToDate.
    • Throws an exception if parallelTasks is zero or negative.
    • Returns a fully populated RuntimeConfig instance.
  • The default method returns a RuntimeConfig instance with hardcoded default values, including:

    • skipLocked = false
    • useLocks = true
    • isDryRun = false
    • isRerun = false
    • parallelTasks = 1
    • stopSparkSession = true
    • historicalRunMode = RunMode.CheckUpdates
    • and others.

This file is the primary place where the runtime configuration is loaded and interpreted, including the skipLocked flag mentioned in the user comment.


Relevant snippet from pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ConfigUtils.scala (lines 30-80 approx)

This utility object provides helper methods to safely read optional configuration values from a Typesafe Config object:

  • getOptionBoolean(conf: Config, path: String): Option[Boolean] — returns Some(boolean) if the path exists, else None.
  • getOptionString(conf: Config, path: String): Option[String] — returns Some(string) if the path exists, else None.
  • getOptListStrings(conf: Config, path: String): Seq[String] — returns a list of strings if the path exists, else empty list.

These methods are used in RuntimeConfig.fromConfig to read optional config keys safely.


Relevant snippet from pramen/api/src/main/scala/za/co/absa/pramen/api/RunMode.scala (lines 20-45)

Defines the RunMode sealed trait and its three case objects:

  • SkipAlreadyRan ("fill_gaps")
  • CheckUpdates ("check_updates")
  • ForceRun ("force")

Includes a method fromString to parse a string into a RunMode.

This is relevant because RuntimeConfig uses RunMode for the historicalRunMode field and parses it from config key "pramen.runtime.run.mode".


Summary

The key relevant information for the user comment about the default configuration line runtime.skip.locked = false under the pramen { } section is:

  • The RuntimeConfig object reads the skipLocked flag from the config key "pramen.runtime.skip.locked".
  • The default value for skipLocked in the default method is false.
  • The config key is expected under the pramen namespace, consistent with the user comment.
  • The fromConfig method reads this flag directly from the provided Config object, which would typically be loaded from reference.conf or other config files.

[no additional relevant snippets found]

val CHECK_ONLY_LATE_DATA = "pramen.check.only.late.data"
val CHECK_ONLY_NEW_DATA = "pramen.check.only.new.data"
val EMAIL_IF_NO_CHANGES = "pramen.email.if.no.changes"
Expand Down Expand Up @@ -141,6 +143,7 @@ object RuntimeConfig {
runTables = ConfigUtils.getOptListStrings(conf, RUN_TABLES),
isUndercover = isUndercover,
useLocks = conf.getBoolean(USE_LOCK),
skipLocked = conf.getBoolean(SKIP_LOCKED),
checkOnlyLateData = checkOnlyLateData,
checkOnlyNewData = checkOnlyNewData,
emailIfNoChanges = conf.getBoolean(EMAIL_IF_NO_CHANGES),
Expand All @@ -163,6 +166,7 @@ object RuntimeConfig {
runTables = Seq.empty,
isUndercover = false,
useLocks = true,
skipLocked = false,
checkOnlyLateData = false,
checkOnlyNewData = true,
emailIfNoChanges = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case class CmdLineConfig(
parallelTasks: Option[Int] = None,
dryRun: Option[Boolean] = None,
useLock: Option[Boolean] = None,
skipLocked: Option[Boolean] = None,
undercover: Option[Boolean] = None,
dateFrom: Option[LocalDate] = None,
dateTo: Option[LocalDate] = None,
Expand Down Expand Up @@ -110,6 +111,9 @@ object CmdLineConfig {
for (useLock <- cmd.useLock)
accumulatedConfig = accumulatedConfig.withValue(USE_LOCK, ConfigValueFactory.fromAnyRef(useLock))

for (skipLocked <- cmd.skipLocked)
accumulatedConfig = accumulatedConfig.withValue(SKIP_LOCKED, ConfigValueFactory.fromAnyRef(skipLocked))

for (inverseOrder <- cmd.inverseOrder)
accumulatedConfig = accumulatedConfig.withValue(IS_INVERSE_ORDER, ConfigValueFactory.fromAnyRef(inverseOrder))

Expand Down Expand Up @@ -206,6 +210,10 @@ object CmdLineConfig {
config.copy(useLock = Option(value)))
.text("If true (default) a lock is used when writing to a table")

opt[Unit]("skip-locked").optional().action((value, config) =>
config.copy(skipLocked = Some(true)))
.text("If true Pramen will skip jobs that are already running (locked) and won't consider it an error.")

opt[Unit]("undercover").optional().action((_, config) =>
config.copy(undercover = Some(true)))
.text("If true, no updates will be done to the bookkeeping data (Ensure you are know what you are doing!)")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.runner.task

/** This exception is used only for gracefully skipping jobs that are already running. */
class AlreadyRunningSkipException extends RuntimeException("Another instance is already running")
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,14 @@ abstract class TaskRunnerBase(conf: Config,

val attempt = try {
Try {
if (!isTransient) {
if (runtimeConfig.useLocks && !lock.tryAcquire())
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")
if (!isTransient && runtimeConfig.useLocks) {
if (!lock.tryAcquire()) {
if (runtimeConfig.skipLocked) {
throw new AlreadyRunningSkipException()
} else {
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")
}
}
}

val recordCountOldOpt = bookkeeper.getLatestDataChunk(task.job.outputTable.name, task.infoDate, task.infoDate).map(_.outputRecordCount)
Expand Down Expand Up @@ -442,6 +447,17 @@ abstract class TaskRunnerBase(conf: Config,
attempt match {
case Success(result) =>
result
case Failure(ex) if ex.isInstanceOf[AlreadyRunningSkipException] =>
TaskResult(task.job.taskDef,
RunStatus.Skipped("Another instance is already running", isWarning = true),
getRunInfo(task.infoDate, started),
applicationId,
isTransient,
isRawFileBased,
Nil,
validationResult.dependencyWarnings,
Seq.empty,
task.job.operation.extraOptions)
case Failure(ex) =>
TaskResult(task.job.taskDef,
RunStatus.Failed(ex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object RuntimeConfigFactory {
runTables: Seq[String] = Seq.empty[String],
isUndercover: Boolean = false,
useLocks: Boolean = false,
skipLocked: Boolean = false,
checkOnlyLateData: Boolean = false,
checkOnlyNewData: Boolean = false,
emailIfNoChanges: Boolean = false,
Expand All @@ -45,6 +46,7 @@ object RuntimeConfigFactory {
runTables,
isUndercover,
useLocks,
skipLocked,
checkOnlyLateData,
checkOnlyNewData,
emailIfNoChanges,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import za.co.absa.pramen.core.config.Keys.LOG_EFFECTIVE_CONFIG
import za.co.absa.pramen.core.utils.ConfigUtils

class CmdLineConfigSuite extends AnyWordSpec {

private val emptyConfig = ConfigFactory.empty
private val populatedConfig = ConfigFactory.parseString(
s"""$DRY_RUN = false
Expand Down Expand Up @@ -290,6 +289,22 @@ class CmdLineConfigSuite extends AnyWordSpec {
assert(!config.getBoolean(USE_LOCK))
}

"return the modified config if skipLocked is specified" in {
val cmd = CmdLineConfig.parseCmdLine(Array("--workflow", "dummy.config", "--skip-locked"))
val config = CmdLineConfig.applyCmdLineToConfig(populatedConfig, cmd.get)

assert(config.hasPath(SKIP_LOCKED))

assert(config.getBoolean(SKIP_LOCKED))
}

"return the modified config if skipLocked is not specified" in {
val cmd = CmdLineConfig.parseCmdLine(Array("--workflow", "dummy.config"))
val config = CmdLineConfig.applyCmdLineToConfig(populatedConfig, cmd.get)

assert(!config.hasPath(SKIP_LOCKED))
}

"return the original config if undercover is not specified" in {
val cmd = CmdLineConfig.parseCmdLine(Array("--workflow", "dummy.config"))
val config = CmdLineConfig.applyCmdLineToConfig(populatedConfig, cmd.get)
Expand Down